From 5536972e615e5cc15afbd126ff034947a2661962 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 3 Mar 2015 19:24:17 +0100 Subject: [PATCH] added currentState to Stage; fixed termination bug in RunnableConsumerStage; added check in StageTester; removed isStarted() in AbstractStage --- .../java/teetime/framework/AbstractStage.java | 33 +++++++++---------- .../teetime/framework/CompositeStage.java | 12 +++---- .../framework/RunnableConsumerStage.java | 10 +++--- src/main/java/teetime/framework/Stage.java | 4 +-- .../java/teetime/framework/StageState.java | 9 +++++ .../teetime/framework/test/OutputHolder.java | 3 +- .../teetime/framework/test/StageTester.java | 7 ++-- 7 files changed, 42 insertions(+), 36 deletions(-) create mode 100644 src/main/java/teetime/framework/StageState.java diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 1c55677f..2037827d 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -31,16 +31,14 @@ public abstract class AbstractStage extends Stage { private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); + private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ protected InputPort<?>[] cachedInputPorts = new InputPort[0]; /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; - - private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); - // BETTER aggregate both states in an enum - private boolean shouldTerminate; - private boolean started; + /** The current state of this stage */ + private StageState currentState = StageState.CREATED; /** * @return the stage's input ports @@ -58,6 +56,11 @@ public abstract class AbstractStage extends Stage { return this.cachedOutputPorts; } + @Override + public StageState getCurrentState() { + return currentState; + } + /** * May not be invoked outside of IPipe implementations */ @@ -73,11 +76,6 @@ public abstract class AbstractStage extends Stage { } } - @Override - public boolean isStarted() { - return started; - } - /** * @param signal * arriving signal @@ -103,6 +101,7 @@ public abstract class AbstractStage extends Stage { @Override public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { this.validateOutputPorts(invalidPortConnections); + currentState = StageState.VALIDATED; } @Override @@ -112,7 +111,7 @@ public abstract class AbstractStage extends Stage { this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); - started = true; + currentState = StageState.STARTED; logger.debug("Started."); } @@ -129,7 +128,7 @@ public abstract class AbstractStage extends Stage { @Override public void onTerminating() throws Exception { logger.trace("onTerminating: " + this.getId()); - this.terminate(); + currentState = StageState.TERMINATED; } /** @@ -174,17 +173,17 @@ public abstract class AbstractStage extends Stage { } @Override - public void terminate() { - this.shouldTerminate = true; + protected void terminate() { + currentState = StageState.TERMINATING; } @Override - public boolean shouldBeTerminated() { - return this.shouldTerminate; + protected boolean shouldBeTerminated() { + return (currentState == StageState.TERMINATING); } @Override - public TerminationStrategy getTerminationStrategy() { + protected TerminationStrategy getTerminationStrategy() { return TerminationStrategy.BY_SIGNAL; } diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 403f826f..190b64ce 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -73,19 +73,15 @@ public abstract class CompositeStage extends Stage { } @Override - public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (final Stage s : getLastStages()) { - s.validateOutputPorts(invalidPortConnections); - } + public final StageState getCurrentState() { + return getFirstStage().getCurrentState(); } @Override - protected final boolean isStarted() { - boolean isStarted = true; + public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (final Stage s : getLastStages()) { - isStarted = isStarted && s.isStarted(); + s.validateOutputPorts(invalidPortConnections); } - return isStarted; } @Override diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 50bd043c..af0b6c4a 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -71,15 +71,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage { } } - final ISignal signal = new TerminatingSignal(); - for (InputPort<?> inputPort : inputPorts) { - stage.onSignal(signal, inputPort); - } + stage.terminate(); } @Override protected void afterStageExecution(final Stage stage) { - // do nothing + final ISignal signal = new TerminatingSignal(); + for (InputPort<?> inputPort : inputPorts) { + stage.onSignal(signal, inputPort); + } } } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 08f27d4c..e971d54b 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -106,6 +106,8 @@ public abstract class Stage { protected abstract boolean shouldBeTerminated(); + public abstract StageState getCurrentState(); + public Thread getOwningThread() { return owningThread; } @@ -116,8 +118,6 @@ public abstract class Stage { protected abstract InputPort<?>[] getInputPorts(); - protected abstract boolean isStarted(); - // events public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); diff --git a/src/main/java/teetime/framework/StageState.java b/src/main/java/teetime/framework/StageState.java new file mode 100644 index 00000000..ae4cc451 --- /dev/null +++ b/src/main/java/teetime/framework/StageState.java @@ -0,0 +1,9 @@ +package teetime.framework; + +public enum StageState { + + CREATED, + VALIDATING, VALIDATED, + STARTING, STARTED, + TERMINATING, TERMINATED +} diff --git a/src/main/java/teetime/framework/test/OutputHolder.java b/src/main/java/teetime/framework/test/OutputHolder.java index 9a5dece0..53f55bd3 100644 --- a/src/main/java/teetime/framework/test/OutputHolder.java +++ b/src/main/java/teetime/framework/test/OutputHolder.java @@ -3,7 +3,6 @@ package teetime.framework.test; import java.util.List; import teetime.framework.OutputPort; -import teetime.framework.Stage; public final class OutputHolder<O> { @@ -13,7 +12,7 @@ public final class OutputHolder<O> { private OutputPort<Object> port; @SuppressWarnings("unchecked") - OutputHolder(final StageTester stageTester, final Stage stage, final List<O> outputList) { + OutputHolder(final StageTester stageTester, final List<O> outputList) { this.stageTester = stageTester; this.outputElements = (List<Object>) outputList; } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 2dd7d3b0..6620640b 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -23,6 +23,7 @@ import java.util.List; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; import teetime.framework.Stage; +import teetime.framework.StageState; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -46,6 +47,9 @@ public final class StageTester { } public static StageTester test(final Stage stage) { + if (stage.getCurrentState() != StageState.CREATED) { + throw new AssertionError("This stage has already been tested in this test method. Move this test into a new test method."); + } return new StageTester(stage); } @@ -60,7 +64,7 @@ public final class StageTester { } public <O> OutputHolder<O> receive(final List<O> outputList) { - final OutputHolder<O> outputHolder = new OutputHolder<O>(this, stage, outputList); + final OutputHolder<O> outputHolder = new OutputHolder<O>(this, outputList); this.outputHolders.add(outputHolder); return outputHolder; } @@ -79,7 +83,6 @@ public final class StageTester { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - for (InputHolder<?> inputHolder : inputHolders) { final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput()); interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort()); -- GitLab