diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 544d55cbaa9fe94e7ea272a0fda6be34af597928..4a835710077113e90334e61b0991e7f1dad0b7dc 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -32,31 +32,6 @@ public abstract class AbstractCompositeStage { */ private static final int DEFAULT_CAPACITY = 4; - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - */ - protected final void declareActive(final Stage stage) { - this.declareActive(stage, stage.getId()); - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - * @param threadName - * A string which can be used for debugging. - */ - protected void declareActive(final Stage stage, final String threadName) { - AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); - Thread newThread = new TeeTimeThread(runnable, threadName); - stage.setOwningThread(newThread); - stage.setActive(true); - } - /** * Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}. * @@ -86,7 +61,7 @@ public abstract class AbstractCompositeStage { protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (sourcePort.getOwningStage().getOwningThread() == null) { - declareActive(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + sourcePort.getOwningStage().declareActive(); } } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index fb6270a0cbc397aa90a1d7a7a20cbfe8a484502d..2ffa6376a0768cd579f29f3a30af8d6e85531124 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -64,10 +64,14 @@ public abstract class Configuration extends AbstractCompositeStage { return factory; } - @Override - protected void declareActive(final Stage stage, final String threadName) { - startStage = stage; // memorize an arbitrary stage as starting point for traversing - super.declareActive(stage, threadName); + /** + * Register pipes if your configuration only relies on custom pipes and therefore {@link #connectPorts(OutputPort, InputPort)} is never called. + * + * @param pipe + * A custom pipe instance + */ + protected void registerCustomPipe(final AbstractPipe<?> pipe) { + startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing } @Override diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 4a246b5916a804f42838ceb51a0479a02a6ae884..dcc6b97f8cd2f47377410ac6e5f16f02dd642c94 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -191,4 +191,29 @@ public abstract class Stage { this.isActive = isActive; } + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + */ + public void declareActive() { + declareActive(getId()); + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + public void declareActive(final String threadName) { + AbstractRunnableStage runnable = AbstractRunnableStage.create(this); + Thread newThread = new TeeTimeThread(runnable, threadName); + this.setOwningThread(newThread); + this.setActive(true); + } + } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 4561e47e186e5956e733fa8e722a3c94a92d3051..c7656c57b43ede82997624b522d02105869b0fcd 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> { } void startStageAtRuntime(final Stage newStage) { - configuration.declareActive(newStage); + newStage.declareActive(); Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index eed58f55c0aa9cb89f68c7f4f15e9e011347541b..10313c679335d1f9b187fab647e69c6b2f54037f 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -91,7 +91,7 @@ public final class StageTester { connectPorts(producer.getOutputPort(), inputHolder.getPort()); } - declareActive(stage); + stage.declareActive(); for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java index 529a72868b190e3f83be05f73375c546219f825a..af9e9c3fec67d309dfdafa9e9be0ae0aaed3968c 100644 --- a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java +++ b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java @@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration { connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - declareActive(threadableStage.getInputPort().getOwningStage()); + threadableStage.getInputPort().getOwningStage().declareActive(); distributorPorts.add(threadableStage.getInputPort()); mergerPorts.add(wc.getOutputPort()); @@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration { connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - declareActive(init); - declareActive(merger); + init.declareActive(); + merger.declareActive(); } public MonitoringThread getMonitoringThread() { diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index 418d999454536be19d70c55b8a3f1a4ac8cbc25d..8cd72834092fb08b5362b2c1e9e2fbf4ee727551 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -34,12 +34,12 @@ public class AbstractCompositeStageTest { private class NestedConf extends Configuration { private final InitialElementProducer<Object> init; - private final Sink sink; + private final Sink<Object> sink; private final TestNestingCompositeStage compositeStage; public NestedConf() { init = new InitialElementProducer<Object>(new Object()); - sink = new Sink(); + sink = new Sink<Object>(); compositeStage = new TestNestingCompositeStage(); connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort()); connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort()); @@ -52,7 +52,7 @@ public class AbstractCompositeStageTest { private final Counter firstCounter = new Counter(); public TestCompositeOneStage() { - declareActive(firstCounter); + firstCounter.declareActive(); } } @@ -63,7 +63,7 @@ public class AbstractCompositeStageTest { private final Counter secondCounter = new Counter(); public TestCompositeTwoStage() { - declareActive(firstCounter); + firstCounter.declareActive(); connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); } diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index dcd445058654b7ad547a654ed4de4f9a46f9b745..617ab6a11a0fb4382261022ad6017fd68aad1d51 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -118,7 +118,7 @@ public class ExecutionTest { public AnalysisTestConfig(final boolean inter) { connectPorts(init.getOutputPort(), sink.getInputPort()); if (inter) { - declareActive(sink); + sink.declareActive(); } } } @@ -143,7 +143,7 @@ public class ExecutionTest { connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort()); - declareActive(iof); + iof.declareActive(); } } @@ -191,7 +191,7 @@ public class ExecutionTest { stageWithNamedThread = new InitialElementProducer<Object>(new Object()); Sink<Object> sink = new Sink<Object>(); - declareActive(stageWithNamedThread, "TestName"); + stageWithNamedThread.declareActive("TestName"); connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort()); } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 3086ed51200b02f080e64a23c65e17970ffe4c4a..7978645126e233b2443590fac2513643f1013457 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -18,6 +18,7 @@ package teetime.framework; import java.util.ArrayList; import java.util.List; +import teetime.framework.pipe.IPipe; import teetime.framework.pipe.SpScPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -30,14 +31,15 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { public RunnableConsumerStageTestConfiguration(final Integer... inputElements) { InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements); if (inputElements.length > 0) { - declareActive(producer); + producer.declareActive(); } CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); - declareActive(collectorSink); + collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index 452cdbbee637beeb42cbfcf8ec1bdf77e9ce7cac..f08b772f4e252518e3ff3772b910c249559fa0f2 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -55,11 +55,11 @@ public class TerminationTest { connectPorts(init.getOutputPort(), firstProp.getInputPort()); connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity); connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort()); - declareActive(sinkStage); + sinkStage.declareActive(); } else { Sink<Integer> sink = new Sink<Integer>(); connectPorts(init.getOutputPort(), sink.getInputPort(), capacity); - declareActive(sink); + sink.declareActive(); } } diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java index aeae76c9bcee341026d9e595dcb272dcca29194c..b4f3c0de2e55ca314a1ab88ddf10e5a7c4610d0b 100644 --- a/src/test/java/teetime/framework/TraverserTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -86,14 +86,14 @@ public class TraverserTest { connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - declareActive(wc.getInputPort().getOwningStage()); + wc.getInputPort().getOwningStage().declareActive(); } // Connect the stages of the last part connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - declareActive(merger); + merger.declareActive(); } } diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 8003cb7d5debb20b12bacafbc7370c8fbb801f79..507dc935b7fa48f71e161634a9cf943e1a8e6b81 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { Stage producer = buildProducer(elements); - declareActive(producer); + producer.declareActive(); Stage consumer = buildConsumer(delay); - declareActive(consumer); + consumer.declareActive(); Clock clock = buildClock(initialDelayInMs, delay); - declareActive(clock); + clock.declareActive(); } private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index 0cc890ea38ff5ce43b4e9e6ca7d372d795eb5d84..a44f45bf89d7b818bbef0c59bb4482a3ca471c49 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration { public YieldStrategyConfiguration(final Object... elements) { InitialElementProducer<Object> producer = buildProducer(elements); - declareActive(producer); + producer.declareActive(); Stage consumer = buildConsumer(producer); - declareActive(consumer); + consumer.declareActive(); } private InitialElementProducer<Object> buildProducer(final Object... elements) { diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index 54de3f41a809092060f4dbfd945a4f96e39f5d90..79f7ff1dd23f5ed9006ed4bd1d78f54e0b946c06 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration { connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); - this.declareActive(second); - this.declareActive(third); + second.declareActive(); + third.declareActive(); } } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 553c5345636b6ec41f9995d390e6b8bdef01c323..f8df4b0751389d020d4eb695adac3c5853f7f874 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -137,8 +137,8 @@ public class DynamicDistributorTest { connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); - declareActive(distributor); - declareActive(collectorSink); + distributor.declareActive(); + collectorSink.declareActive(); for (PortAction<DynamicDistributor<T>> a : inputActions) { distributor.addPortActionRequest(a); diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 0b5187a1e853ace01ee6031e7e466a69d2bf4347..fdff183633c7404cd6f21fbca3a17986d9ad68e2 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -118,7 +118,7 @@ public class DynamicMergerTest { connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort()); connectPorts(merger.getOutputPort(), collectorSink.getInputPort()); - declareActive(merger); + merger.declareActive(); for (PortAction<DynamicMerger<T>> a : inputActions) { boolean added = merger.addPortActionRequest(a);