Skip to content
Snippets Groups Projects
Commit bf4e8e56 authored by Christian Wulf's avatar Christian Wulf
Browse files

made stage private in AbstractRunnableStage

parent 758c8822
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory;
abstract class AbstractRunnableStage implements Runnable {
protected final Stage stage;
private final Stage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
......@@ -35,13 +35,13 @@ abstract class AbstractRunnableStage implements Runnable {
final Stage stage = this.stage;
try {
beforeStageExecution();
beforeStageExecution(stage);
do {
executeStage();
executeStage(stage);
} while (!stage.shouldBeTerminated());
afterStageExecution();
afterStageExecution(stage);
} catch (Error e) {
this.logger.error("Terminating thread due to the following exception: ", e);
......@@ -54,9 +54,9 @@ abstract class AbstractRunnableStage implements Runnable {
this.logger.debug("Finished runnable stage. (" + stage.getId() + ")");
}
protected abstract void beforeStageExecution();
protected abstract void beforeStageExecution(Stage stage);
protected abstract void executeStage();
protected abstract void executeStage(Stage stage);
protected abstract void afterStageExecution();
protected abstract void afterStageExecution(Stage stage);
}
......@@ -43,12 +43,11 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@Override
protected void beforeStageExecution() {
protected void beforeStageExecution(final Stage stage) {
logger.trace("ENTRY beforeStageExecution");
final Stage stage = this.stage;
do {
checkforSignals();
checkforSignals(stage);
Thread.yield();
} while (!stage.isStarted());
......@@ -56,16 +55,16 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@Override
protected void executeStage() {
protected void executeStage(final Stage stage) {
try {
this.stage.executeWithPorts();
stage.executeWithPorts();
} catch (NotEnoughInputException e) {
checkforSignals(); // check for termination
executeIdleStrategy();
checkforSignals(stage); // check for termination
executeIdleStrategy(stage);
}
}
private void executeIdleStrategy() {
private void executeIdleStrategy(final Stage stage) {
if (stage.shouldBeTerminated()) {
return;
}
......@@ -77,8 +76,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
private void checkforSignals() {
final Stage stage = this.stage;
private void checkforSignals(final Stage stage) {
for (InputPort<?> inputPort : inputPorts) {
final IPipe pipe = inputPort.getPipe();
if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed?
......@@ -92,7 +90,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@Override
protected void afterStageExecution() {
protected void afterStageExecution(final Stage stage) {
// do nothing
}
......
......@@ -25,20 +25,20 @@ public final class RunnableProducerStage extends AbstractRunnableStage {
}
@Override
protected void beforeStageExecution() {
protected void beforeStageExecution(final Stage stage) {
final StartingSignal startingSignal = new StartingSignal();
this.stage.onSignal(startingSignal, null);
stage.onSignal(startingSignal, null);
}
@Override
protected void executeStage() {
this.stage.executeWithPorts();
protected void executeStage(final Stage stage) {
stage.executeWithPorts();
}
@Override
protected void afterStageExecution() {
protected void afterStageExecution(final Stage stage) {
final TerminatingSignal terminatingSignal = new TerminatingSignal();
this.stage.onSignal(terminatingSignal, null);
stage.onSignal(terminatingSignal, null);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment