diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index ad1fc8ceaf4a3ba94a4dbca71a9305bd942d3868..544d55cbaa9fe94e7ea272a0fda6be34af597928 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -38,8 +38,8 @@ public abstract class AbstractCompositeStage { * @param stage * A arbitrary stage, which will be added to the configuration and executed in a thread. */ - protected final void addThreadableStage(final Stage stage) { - this.addThreadableStage(stage, stage.getId()); + protected final void declareActive(final Stage stage) { + this.declareActive(stage, stage.getId()); } /** @@ -50,10 +50,11 @@ public abstract class AbstractCompositeStage { * @param threadName * A string which can be used for debugging. */ - protected void addThreadableStage(final Stage stage, final String threadName) { + protected void declareActive(final Stage stage, final String threadName) { AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); Thread newThread = new TeeTimeThread(runnable, threadName); stage.setOwningThread(newThread); + stage.setActive(true); } /** @@ -85,7 +86,7 @@ public abstract class AbstractCompositeStage { protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (sourcePort.getOwningStage().getOwningThread() == null) { - addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 97e9c344ade98c7c9b65ce50ac0324bb5c77c125..2c10587fe47bc1ae8b4f5051fbc9b6e09ae441b8 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -56,9 +56,9 @@ public abstract class Configuration extends AbstractCompositeStage { } @Override - protected void addThreadableStage(final Stage stage, final String threadName) { + protected void declareActive(final Stage stage, final String threadName) { startStage = stage; // memorize an arbitrary stage as starting point for traversing - super.addThreadableStage(stage, threadName); + super.declareActive(stage, threadName); } @Override diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 8c356665d14b5f23352fc9f5eb1369d8f1747dde..054821bbbfb075615425c3baddc183e5d49e4ce9 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -19,7 +19,7 @@ import java.util.Set; /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. - * Stages can be added by executing {@link #addThreadableStage(Stage)}. + * Stages can be added by executing {@link #declareActive(Stage)}. * * @since 2.0 */ diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 4cc0cf7673b735c4641265ac624e68d0204a1970..4a246b5916a804f42838ceb51a0479a02a6ae884 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -50,6 +50,8 @@ public abstract class Stage { /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ private Thread owningThread; + private boolean isActive; + private ConfigurationContext owningContext; ConfigurationContext getOwningContext() { @@ -181,4 +183,12 @@ public abstract class Stage { protected abstract void removeDynamicPort(InputPort<?> inputPort); + public boolean isActive() { + return isActive; + } + + void setActive(final boolean isActive) { + this.isActive = isActive; + } + } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 4528ed5c90e621ce93fc0a038b46b36add26eafe..4561e47e186e5956e733fa8e722a3c94a92d3051 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> { } void startStageAtRuntime(final Stage newStage) { - configuration.addThreadableStage(newStage); + configuration.declareActive(newStage); Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index fd014645078cecde0955a7c9fb6dac655aa37845..eed58f55c0aa9cb89f68c7f4f15e9e011347541b 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -91,7 +91,7 @@ public final class StageTester { connectPorts(producer.getOutputPort(), inputHolder.getPort()); } - addThreadableStage(stage); + declareActive(stage); for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java index a214fadc050b675770eaa5d6f8d614df44ad7a98..529a72868b190e3f83be05f73375c546219f825a 100644 --- a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java +++ b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java @@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration { connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(threadableStage.getInputPort().getOwningStage()); + declareActive(threadableStage.getInputPort().getOwningStage()); distributorPorts.add(threadableStage.getInputPort()); mergerPorts.add(wc.getOutputPort()); @@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration { connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - addThreadableStage(init); - addThreadableStage(merger); + declareActive(init); + declareActive(merger); } public MonitoringThread getMonitoringThread() { diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index 18784f4b3686ec4251779bdf86271d6d6c8a87dd..418d999454536be19d70c55b8a3f1a4ac8cbc25d 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -27,17 +27,17 @@ public class AbstractCompositeStageTest { @Ignore @Test public void testNestedStages() { - Execution<NestesConfig> exec = new Execution<NestesConfig>(new NestesConfig()); + Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf()); // assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); } - private class NestesConfig extends Configuration { + private class NestedConf extends Configuration { private final InitialElementProducer<Object> init; private final Sink sink; private final TestNestingCompositeStage compositeStage; - public NestesConfig() { + public NestedConf() { init = new InitialElementProducer<Object>(new Object()); sink = new Sink(); compositeStage = new TestNestingCompositeStage(); @@ -52,7 +52,7 @@ public class AbstractCompositeStageTest { private final Counter firstCounter = new Counter(); public TestCompositeOneStage() { - addThreadableStage(firstCounter); + declareActive(firstCounter); } } @@ -63,7 +63,7 @@ public class AbstractCompositeStageTest { private final Counter secondCounter = new Counter(); public TestCompositeTwoStage() { - addThreadableStage(firstCounter); + declareActive(firstCounter); connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); } diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index c256f27b29d92b49ad2087a6a64b76ee88847284..dcd445058654b7ad547a654ed4de4f9a46f9b745 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -118,7 +118,7 @@ public class ExecutionTest { public AnalysisTestConfig(final boolean inter) { connectPorts(init.getOutputPort(), sink.getInputPort()); if (inter) { - addThreadableStage(sink); + declareActive(sink); } } } @@ -143,7 +143,7 @@ public class ExecutionTest { connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort()); - addThreadableStage(iof); + declareActive(iof); } } @@ -191,7 +191,7 @@ public class ExecutionTest { stageWithNamedThread = new InitialElementProducer<Object>(new Object()); Sink<Object> sink = new Sink<Object>(); - addThreadableStage(stageWithNamedThread, "TestName"); + declareActive(stageWithNamedThread, "TestName"); connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort()); } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 3f61179279e809343854715e2419fc23e7e9bb16..3086ed51200b02f080e64a23c65e17970ffe4c4a 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -30,11 +30,11 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { public RunnableConsumerStageTestConfiguration(final Integer... inputElements) { InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements); if (inputElements.length > 0) { - addThreadableStage(producer); + declareActive(producer); } CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); - addThreadableStage(collectorSink); + declareActive(collectorSink); // Can not use createPorts, as the if condition above will lead to an exception new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 937a3b2fd0d6c944354ca426a5e6371f3478c0bb..3081b34678e7e3d81ce154d6ec65b9335f61d708 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -18,7 +18,9 @@ package teetime.framework; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.junit.Assert; import org.junit.Test; @@ -53,6 +55,13 @@ public class StageTest { assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener); } + @Test + public void testActiveFlag() { + TestConfig config = new TestConfig(); + assertTrue(config.init.isActive()); + assertFalse(config.delay.isActive()); + } + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public InitialElementProducer<String> init; diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index ac8767608de4dfc61647426a8b7579929dc7fd70..452cdbbee637beeb42cbfcf8ec1bdf77e9ce7cac 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -55,11 +55,11 @@ public class TerminationTest { connectPorts(init.getOutputPort(), firstProp.getInputPort()); connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity); connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort()); - addThreadableStage(sinkStage); + declareActive(sinkStage); } else { Sink<Integer> sink = new Sink<Integer>(); connectPorts(init.getOutputPort(), sink.getInputPort(), capacity); - addThreadableStage(sink); + declareActive(sink); } } diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java index 27878602463535c0cd8a37df0254abc9274fac68..aeae76c9bcee341026d9e595dcb272dcca29194c 100644 --- a/src/test/java/teetime/framework/TraverserTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -86,14 +86,14 @@ public class TraverserTest { connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(wc.getInputPort().getOwningStage()); + declareActive(wc.getInputPort().getOwningStage()); } // Connect the stages of the last part connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - addThreadableStage(merger); + declareActive(merger); } } diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 02730ddeb2d69a5f8156d200fb53196581ab7d1e..8003cb7d5debb20b12bacafbc7370c8fbb801f79 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { Stage producer = buildProducer(elements); - addThreadableStage(producer); + declareActive(producer); Stage consumer = buildConsumer(delay); - addThreadableStage(consumer); + declareActive(consumer); Clock clock = buildClock(initialDelayInMs, delay); - addThreadableStage(clock); + declareActive(clock); } private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index 46af8c04b89bc306b66063f6186235f10ab9b0b8..0cc890ea38ff5ce43b4e9e6ca7d372d795eb5d84 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration { public YieldStrategyConfiguration(final Object... elements) { InitialElementProducer<Object> producer = buildProducer(elements); - addThreadableStage(producer); + declareActive(producer); Stage consumer = buildConsumer(producer); - addThreadableStage(consumer); + declareActive(consumer); } private InitialElementProducer<Object> buildProducer(final Object... elements) { diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index 9229eb758a8a5cb16a7ed0618e84e6e62ea10841..54de3f41a809092060f4dbfd945a4f96e39f5d90 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration { connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); - this.addThreadableStage(second); - this.addThreadableStage(third); + this.declareActive(second); + this.declareActive(third); } } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 26a61ce6603a9e6fc2eb0ec631a81c172815b8b8..553c5345636b6ec41f9995d390e6b8bdef01c323 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -137,8 +137,8 @@ public class DynamicDistributorTest { connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); - addThreadableStage(distributor); - addThreadableStage(collectorSink); + declareActive(distributor); + declareActive(collectorSink); for (PortAction<DynamicDistributor<T>> a : inputActions) { distributor.addPortActionRequest(a); diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 5d8cb94894df1dd0857f778b3a2e32e93c18187c..0b5187a1e853ace01ee6031e7e466a69d2bf4347 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -118,7 +118,7 @@ public class DynamicMergerTest { connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort()); connectPorts(merger.getOutputPort(), collectorSink.getInputPort()); - addThreadableStage(merger); + declareActive(merger); for (PortAction<DynamicMerger<T>> a : inputActions) { boolean added = merger.addPortActionRequest(a);