diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 1c55677f989e21b349508871bf48c3a91e0c4dd9..2037827ddac789ba63023eb2a7fe1bfde0cea68e 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -31,16 +31,14 @@ public abstract class AbstractStage extends Stage { private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); + private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ protected InputPort<?>[] cachedInputPorts = new InputPort[0]; /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; - - private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); - // BETTER aggregate both states in an enum - private boolean shouldTerminate; - private boolean started; + /** The current state of this stage */ + private StageState currentState = StageState.CREATED; /** * @return the stage's input ports @@ -58,6 +56,11 @@ public abstract class AbstractStage extends Stage { return this.cachedOutputPorts; } + @Override + public StageState getCurrentState() { + return currentState; + } + /** * May not be invoked outside of IPipe implementations */ @@ -73,11 +76,6 @@ public abstract class AbstractStage extends Stage { } } - @Override - public boolean isStarted() { - return started; - } - /** * @param signal * arriving signal @@ -103,6 +101,7 @@ public abstract class AbstractStage extends Stage { @Override public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { this.validateOutputPorts(invalidPortConnections); + currentState = StageState.VALIDATED; } @Override @@ -112,7 +111,7 @@ public abstract class AbstractStage extends Stage { this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); - started = true; + currentState = StageState.STARTED; logger.debug("Started."); } @@ -129,7 +128,7 @@ public abstract class AbstractStage extends Stage { @Override public void onTerminating() throws Exception { logger.trace("onTerminating: " + this.getId()); - this.terminate(); + currentState = StageState.TERMINATED; } /** @@ -174,17 +173,17 @@ public abstract class AbstractStage extends Stage { } @Override - public void terminate() { - this.shouldTerminate = true; + protected void terminate() { + currentState = StageState.TERMINATING; } @Override - public boolean shouldBeTerminated() { - return this.shouldTerminate; + protected boolean shouldBeTerminated() { + return (currentState == StageState.TERMINATING); } @Override - public TerminationStrategy getTerminationStrategy() { + protected TerminationStrategy getTerminationStrategy() { return TerminationStrategy.BY_SIGNAL; } diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 403f826fd855a44f16d25e713f5322f71a09cd6a..190b64ce685f4f1448301c9ee79c952399d33e23 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -73,19 +73,15 @@ public abstract class CompositeStage extends Stage { } @Override - public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (final Stage s : getLastStages()) { - s.validateOutputPorts(invalidPortConnections); - } + public final StageState getCurrentState() { + return getFirstStage().getCurrentState(); } @Override - protected final boolean isStarted() { - boolean isStarted = true; + public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (final Stage s : getLastStages()) { - isStarted = isStarted && s.isStarted(); + s.validateOutputPorts(invalidPortConnections); } - return isStarted; } @Override diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 50bd043c85addafacc028111e7a2acccfbde743b..af0b6c4a62e8558c124b62cd621f93944b611efd 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -71,15 +71,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } } - final ISignal signal = new TerminatingSignal(); - for (InputPort<?> inputPort : inputPorts) { - stage.onSignal(signal, inputPort); - } + stage.terminate(); } @Override protected void afterStageExecution(final Stage stage) { - // do nothing + final ISignal signal = new TerminatingSignal(); + for (InputPort<?> inputPort : inputPorts) { + stage.onSignal(signal, inputPort); + } } } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 08f27d4c2936771c592a51e02931d2dbc1bafd1f..e971d54b705325a8a61126533dba0d38cdf8c429 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -106,6 +106,8 @@ public abstract class Stage { protected abstract boolean shouldBeTerminated(); + public abstract StageState getCurrentState(); + public Thread getOwningThread() { return owningThread; } @@ -116,8 +118,6 @@ public abstract class Stage { protected abstract InputPort<?>[] getInputPorts(); - protected abstract boolean isStarted(); - // events public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); diff --git a/src/main/java/teetime/framework/StageState.java b/src/main/java/teetime/framework/StageState.java new file mode 100644 index 0000000000000000000000000000000000000000..ae4cc451c88aa21cd9c27d768aebbd6fa2b616f0 --- /dev/null +++ b/src/main/java/teetime/framework/StageState.java @@ -0,0 +1,9 @@ +package teetime.framework; + +public enum StageState { + + CREATED, + VALIDATING, VALIDATED, + STARTING, STARTED, + TERMINATING, TERMINATED +} diff --git a/src/main/java/teetime/framework/test/OutputHolder.java b/src/main/java/teetime/framework/test/OutputHolder.java index 9a5dece0f8f9665d18834638c8ca4f6f28f89be3..53f55bd3f2c0a93728d485fda1e8455053fdc4a2 100644 --- a/src/main/java/teetime/framework/test/OutputHolder.java +++ b/src/main/java/teetime/framework/test/OutputHolder.java @@ -3,7 +3,6 @@ package teetime.framework.test; import java.util.List; import teetime.framework.OutputPort; -import teetime.framework.Stage; public final class OutputHolder<O> { @@ -13,7 +12,7 @@ public final class OutputHolder<O> { private OutputPort<Object> port; @SuppressWarnings("unchecked") - OutputHolder(final StageTester stageTester, final Stage stage, final List<O> outputList) { + OutputHolder(final StageTester stageTester, final List<O> outputList) { this.stageTester = stageTester; this.outputElements = (List<Object>) outputList; } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 2dd7d3b05d9e75555dec4461412b4997574df5bd..6620640b46dbcfe8d3564ef41081b90be562c2e0 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -23,6 +23,7 @@ import java.util.List; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; import teetime.framework.Stage; +import teetime.framework.StageState; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -46,6 +47,9 @@ public final class StageTester { } public static StageTester test(final Stage stage) { + if (stage.getCurrentState() != StageState.CREATED) { + throw new AssertionError("This stage has already been tested in this test method. Move this test into a new test method."); + } return new StageTester(stage); } @@ -60,7 +64,7 @@ public final class StageTester { } public <O> OutputHolder<O> receive(final List<O> outputList) { - final OutputHolder<O> outputHolder = new OutputHolder<O>(this, stage, outputList); + final OutputHolder<O> outputHolder = new OutputHolder<O>(this, outputList); this.outputHolders.add(outputHolder); return outputHolder; } @@ -79,7 +83,6 @@ public final class StageTester { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - for (InputHolder<?> inputHolder : inputHolders) { final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput()); interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort());