Skip to content
Snippets Groups Projects
Commit f5a669dc authored by Christian Wulf's avatar Christian Wulf
Browse files

switched while-do to do-while in Pipeline

parent 65cf609c
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ import java.util.UUID; ...@@ -8,6 +8,7 @@ import java.util.UUID;
import kieker.common.logging.Log; import kieker.common.logging.Log;
import kieker.common.logging.LogFactory; import kieker.common.logging.LogFactory;
// BETTER remove the pipeline since it does not add any new functionality
public class Pipeline<I, O> implements StageWithPort<I, O> { public class Pipeline<I, O> implements StageWithPort<I, O> {
private final String id; private final String id;
...@@ -58,22 +59,20 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -58,22 +59,20 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
// StageWithPort<?, ?> headStage = this.currentHeads.next();
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
do { do {
headStage.executeWithPorts(); headStage.executeWithPorts();
} while (headStage.isReschedulable()); } while (headStage.isReschedulable());
// headStage.sendFinishedSignalToAllSuccessorStages();
this.updateRescheduable(headStage); this.updateRescheduable(headStage);
} }
private final void updateRescheduable(final StageWithPort<?, ?> stage) { private final void updateRescheduable(final StageWithPort<?, ?> stage) {
StageWithPort<?, ?> currentStage = stage; StageWithPort<?, ?> currentStage = stage;
while (!currentStage.isReschedulable()) { do {
// this.currentHeads.remove(currentStage);
// this.currentHeads.addAll(currentStage.getOutputPorts());
this.firstStageIndex++; this.firstStageIndex++;
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
// if (currentStage == null) { // loop reaches the last stage // if (currentStage == null) { // loop reaches the last stage
...@@ -84,7 +83,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -84,7 +83,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
} }
currentStage = this.stages[this.firstStageIndex]; currentStage = this.stages[this.firstStageIndex];
currentStage.onIsPipelineHead(); currentStage.onIsPipelineHead();
} } while (!currentStage.isReschedulable());
this.setReschedulable(true); this.setReschedulable(true);
} }
......
...@@ -21,6 +21,9 @@ public class RunnableStage implements Runnable { ...@@ -21,6 +21,9 @@ public class RunnableStage implements Runnable {
do { do {
this.stage.executeWithPorts(); this.stage.executeWithPorts();
} while (this.stage.isReschedulable()); } while (this.stage.isReschedulable());
// stage.sendFinishedSignalToAllSuccessorStages();
} catch (RuntimeException e) { } catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e); this.logger.error("Terminating thread due to the following exception: ", e);
throw e; throw e;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment