diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 96906facb7a1bf0eebb35e376493dd6c4a135ebe..0b26490fbd0ce76deef9dda945606eda9f76277f 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory; abstract class AbstractRunnableStage implements Runnable { - protected final Stage stage; + private final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; @@ -35,13 +35,13 @@ abstract class AbstractRunnableStage implements Runnable { final Stage stage = this.stage; try { - beforeStageExecution(); + beforeStageExecution(stage); do { - executeStage(); + executeStage(stage); } while (!stage.shouldBeTerminated()); - afterStageExecution(); + afterStageExecution(stage); } catch (Error e) { this.logger.error("Terminating thread due to the following exception: ", e); @@ -54,9 +54,9 @@ abstract class AbstractRunnableStage implements Runnable { this.logger.debug("Finished runnable stage. (" + stage.getId() + ")"); } - protected abstract void beforeStageExecution(); + protected abstract void beforeStageExecution(Stage stage); - protected abstract void executeStage(); + protected abstract void executeStage(Stage stage); - protected abstract void afterStageExecution(); + protected abstract void afterStageExecution(Stage stage); } diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 035ddfc679d8584f32bfc7533fc167ec79039b82..3fc6a173d79b21d0ca75f7e0d4b03bbdf0390854 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -43,12 +43,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } @Override - protected void beforeStageExecution() { + protected void beforeStageExecution(final Stage stage) { logger.trace("ENTRY beforeStageExecution"); - final Stage stage = this.stage; do { - checkforSignals(); + checkforSignals(stage); Thread.yield(); } while (!stage.isStarted()); @@ -56,16 +55,16 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } @Override - protected void executeStage() { + protected void executeStage(final Stage stage) { try { - this.stage.executeWithPorts(); + stage.executeWithPorts(); } catch (NotEnoughInputException e) { - checkforSignals(); // check for termination - executeIdleStrategy(); + checkforSignals(stage); // check for termination + executeIdleStrategy(stage); } } - private void executeIdleStrategy() { + private void executeIdleStrategy(final Stage stage) { if (stage.shouldBeTerminated()) { return; } @@ -77,8 +76,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } @SuppressWarnings("PMD.DataflowAnomalyAnalysis") - private void checkforSignals() { - final Stage stage = this.stage; + private void checkforSignals(final Stage stage) { for (InputPort<?> inputPort : inputPorts) { final IPipe pipe = inputPort.getPipe(); if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed? @@ -92,7 +90,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } @Override - protected void afterStageExecution() { + protected void afterStageExecution(final Stage stage) { // do nothing } diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index fe82c9705d5a4981db6f0bf6acc195cd9748330a..5cb6a047f928876c769874f0c6cfcc75d5d2bfc0 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -25,20 +25,20 @@ public final class RunnableProducerStage extends AbstractRunnableStage { } @Override - protected void beforeStageExecution() { + protected void beforeStageExecution(final Stage stage) { final StartingSignal startingSignal = new StartingSignal(); - this.stage.onSignal(startingSignal, null); + stage.onSignal(startingSignal, null); } @Override - protected void executeStage() { - this.stage.executeWithPorts(); + protected void executeStage(final Stage stage) { + stage.executeWithPorts(); } @Override - protected void afterStageExecution() { + protected void afterStageExecution(final Stage stage) { final TerminatingSignal terminatingSignal = new TerminatingSignal(); - this.stage.onSignal(terminatingSignal, null); + stage.onSignal(terminatingSignal, null); } }