Skip to content
Snippets Groups Projects
Commit 9e515468 authored by Christian Wulf's avatar Christian Wulf
Browse files

added do/while to pipeline

parent 7e0abed4
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -31,6 +31,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private boolean reschedulable; private boolean reschedulable;
private int firstStageIndex; private int firstStageIndex;
// private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>();
public Pipeline() { public Pipeline() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.id); this.logger = LogFactory.getLog(this.id);
...@@ -80,16 +82,23 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -80,16 +82,23 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
this.logger.debug("Executing stage..."); this.logger.debug("Executing stage...");
StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex];
firstStage.executeWithPorts();
this.updateRescheduable(firstStage); // StageWithPort<?, ?> headStage = this.currentHeads.next();
// this.setReschedulable(stage.isReschedulable()); StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
do {
headStage.executeWithPorts();
} while (headStage.isReschedulable());
this.updateRescheduable(headStage);
} }
private final void updateRescheduable(final StageWithPort<?, ?> stage) { private final void updateRescheduable(final StageWithPort<?, ?> stage) {
StageWithPort<?, ?> currentStage = stage; StageWithPort<?, ?> currentStage = stage;
while (!currentStage.isReschedulable()) { while (!currentStage.isReschedulable()) {
// this.currentHeads.remove(currentStage);
// this.currentHeads.addAll(currentStage.getOutputPorts());
this.firstStageIndex++; this.firstStageIndex++;
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
// if (currentStage == null) { // loop reaches the last stage // if (currentStage == null) { // loop reaches the last stage
...@@ -100,7 +109,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -100,7 +109,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
} }
currentStage = this.stages[this.firstStageIndex]; currentStage = this.stages[this.firstStageIndex];
currentStage.onIsPipelineHead(); currentStage.onIsPipelineHead();
// System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName());
} }
this.setReschedulable(true); this.setReschedulable(true);
} }
......
...@@ -40,6 +40,7 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; ...@@ -40,6 +40,7 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
* @since 1.10 * @since 1.10
*/ */
public class MethodCallThroughputAnalysis15 extends Analysis { public class MethodCallThroughputAnalysis15 extends Analysis {
// FIXME this analysis sometimes runs infinitely
private static final int SPSC_INITIAL_CAPACITY = 4; private static final int SPSC_INITIAL_CAPACITY = 4;
......
ok
no
inf
inf
inf
no
no
no
ok
no
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment