diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 2da5e52c7db3c002eab676679f8eeaf970ee4c20..89c4270e55110595f12e201dd2e347c45b54ee1d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -8,6 +8,7 @@ import java.util.UUID; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; +// BETTER remove the pipeline since it does not add any new functionality public class Pipeline<I, O> implements StageWithPort<I, O> { private final String id; @@ -58,22 +59,20 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public void executeWithPorts() { - // StageWithPort<?, ?> headStage = this.currentHeads.next(); StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; do { headStage.executeWithPorts(); } while (headStage.isReschedulable()); + // headStage.sendFinishedSignalToAllSuccessorStages(); + this.updateRescheduable(headStage); } private final void updateRescheduable(final StageWithPort<?, ?> stage) { StageWithPort<?, ?> currentStage = stage; - while (!currentStage.isReschedulable()) { - // this.currentHeads.remove(currentStage); - // this.currentHeads.addAll(currentStage.getOutputPorts()); - + do { this.firstStageIndex++; // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? // if (currentStage == null) { // loop reaches the last stage @@ -84,7 +83,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { } currentStage = this.stages[this.firstStageIndex]; currentStage.onIsPipelineHead(); - } + } while (!currentStage.isReschedulable()); + this.setReschedulable(true); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 9d4961c5f40dc5c1164f8ceede7b3d2e032ebf44..ab1892e71c3783d5160cc81f9b72b20259717b5c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -21,6 +21,9 @@ public class RunnableStage implements Runnable { do { this.stage.executeWithPorts(); } while (this.stage.isReschedulable()); + + // stage.sendFinishedSignalToAllSuccessorStages(); + } catch (RuntimeException e) { this.logger.error("Terminating thread due to the following exception: ", e); throw e;