diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 6ed7a57dd69949696e17dd850c36b61a52c536c3..5f4f395322c11f1d0ac2e3ef8a24a60d30254dad 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -21,8 +21,7 @@ </action> <action dev="ntd" type="add" issue="171"> Configurations are now - built within the Configuration class which is passed on to nested - CompositeStages. + built within the Configuration class. This removes any constraints on CompositeStages and enables therefore multiple connections and multithreading. </action> diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 26e8685ef260a18ca2dcf1705cf71f98bff4ba92..060f0e773d0124af0379c72efbb7c005ef437b99 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -32,14 +32,11 @@ public abstract class AbstractCompositeStage { private final ConfigurationContext context; - public AbstractCompositeStage(final ConfigurationContext context) { - if (null == context) { - throw new IllegalArgumentException("Context may not be null."); - } - this.context = context; + public AbstractCompositeStage() { + this.context = new ConfigurationContext(); } - protected ConfigurationContext getContext() { + ConfigurationContext getContext() { return context; } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index bfaf92150fc223a0abc6347e96cf0f812ff4488b..f97226447347bc629138231a0845a5a223aceedd 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -25,8 +25,4 @@ package teetime.framework; */ public abstract class Configuration extends AbstractCompositeStage { - public Configuration() { - super(new ConfigurationContext()); - } - } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 483b1ecbdb93ee23cfd9fec89906d947ba1d9787..87bb41bf49b0cf5d0e12c1391673ac44dff551e7 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -29,11 +29,13 @@ import teetime.framework.pipe.InstantiationPipe; * * @since 2.0 */ -public final class ConfigurationContext { +final class ConfigurationContext { + + public static final ConfigurationContext EMPTY_CONTEXT = new ConfigurationContext(); private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); - private final Map<Stage, String> threadableStages = new HashMap<Stage, String>(); + private Map<Stage, String> threadableStages = new HashMap<Stage, String>(); ConfigurationContext() {} @@ -45,6 +47,7 @@ public final class ConfigurationContext { * @see AbstractCompositeStage#addThreadableStage(Stage) */ final void addThreadableStage(final Stage stage, final String threadName) { + mergeContexts(stage); if (this.threadableStages.put(stage, threadName) != null) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); } @@ -63,7 +66,21 @@ public final class ConfigurationContext { LOGGER.warn("Overwriting existing pipe while connecting stages " + sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); } + mergeContexts(sourcePort.getOwningStage()); + mergeContexts(targetPort.getOwningStage()); new InstantiationPipe(sourcePort, targetPort, capacity); } + final void mergeContexts(final Stage stage) { + if (!stage.owningContext.equals(EMPTY_CONTEXT)) { + if (stage.owningContext != this) { // Performance + this.threadableStages.putAll(stage.owningContext.threadableStages); + stage.owningContext.threadableStages = this.threadableStages; + } + } else { + stage.owningContext = this; + } + + } + } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 1c9ce2e20146941636db8db2f4e19b826b8d47f8..5146a2e85bdfa12e6092389077785c5287313f82 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -48,6 +48,8 @@ public abstract class Stage { /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ protected Thread owningThread; + ConfigurationContext owningContext = ConfigurationContext.EMPTY_CONTEXT; + protected Stage() { this.id = this.createId(); this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 6f38fbab37e218bcde981c63263513d8a993632e..c03f564679d2112ba9dceae96a8b58f8c7a4b6fb 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.AbstractCompositeStage; -import teetime.framework.ConfigurationContext; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; @@ -32,8 +31,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { private final Distributor<T> distributor; private final List<Stage> lastStages = new ArrayList<Stage>(); - public EveryXthPrinter(final int threshold, final ConfigurationContext context) { - super(context); + public EveryXthPrinter(final int threshold) { distributor = new Distributor<T>(new CopyByReferenceStrategy()); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); Printer<Integer> printer = new Printer<Integer>(); diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 1508c15edbf052f26a265df4253c750ef2917944..46d22380f947bc1023eb2594f404928d2f79a347 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -16,7 +16,6 @@ package teetime.stage.string; import teetime.framework.AbstractCompositeStage; -import teetime.framework.ConfigurationContext; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.stage.MappingCounter; @@ -36,8 +35,7 @@ public final class WordCounter extends AbstractCompositeStage { private final Tokenizer tokenizer; private final MappingCounter<String> mapCounter; - public WordCounter(final ConfigurationContext context) { - super(context); + public WordCounter() { this.tokenizer = new Tokenizer(" "); final ToLowerCase toLowerCase = new ToLowerCase(); diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki new file mode 160000 index 0000000000000000000000000000000000000000..709c839c447a50c93b37fcc633a01297115d4823 --- /dev/null +++ b/src/site/markdown/wiki @@ -0,0 +1 @@ +Subproject commit 709c839c447a50c93b37fcc633a01297115d4823 diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index 3ca971fb3f31ea250e8ebcc718693f526e8df1f5..f023d6199695e0a3b47589d5b3c40db3d7a96706 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -15,7 +15,72 @@ */ package teetime.framework; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import teetime.stage.Counter; +import teetime.stage.InitialElementProducer; +import teetime.stage.basic.Sink; public class AbstractCompositeStageTest { + @Test + public void testNestedStages() { + Execution<NestesConfig> exec = new Execution<NestesConfig>(new NestesConfig()); + assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); + } + + private class NestesConfig extends Configuration { + + private final InitialElementProducer<Object> init; + private final Sink sink; + private final TestNestingCompositeStage compositeStage; + + public NestesConfig() { + init = new InitialElementProducer<Object>(new Object()); + sink = new Sink(); + compositeStage = new TestNestingCompositeStage(); + connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort()); + connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort()); + + } + } + + private class TestCompositeOneStage extends AbstractCompositeStage { + + private final Counter firstCounter = new Counter(); + + public TestCompositeOneStage() { + addThreadableStage(firstCounter); + } + + } + + private class TestCompositeTwoStage extends AbstractCompositeStage { + + private final Counter firstCounter = new Counter(); + private final Counter secondCounter = new Counter(); + + public TestCompositeTwoStage() { + addThreadableStage(firstCounter); + connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); + } + + } + + private class TestNestingCompositeStage extends AbstractCompositeStage { + + public TestCompositeOneStage firstCompositeStage; + public TestCompositeTwoStage secondCompositeStage; + + public TestNestingCompositeStage() { + firstCompositeStage = new TestCompositeOneStage(); + secondCompositeStage = new TestCompositeTwoStage(); + connectPorts(firstCompositeStage.firstCounter.getOutputPort(), secondCompositeStage.firstCounter.getInputPort()); + } + + } + } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 95e118b1695bafeda2d14f3829c383e10bcfcff2..ec0829cecaff06ad0627c66ef225bfb9a491c4eb 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -77,7 +77,7 @@ public class TraversorTest { // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); - final WordCounter wc = new WordCounter(this.getContext()); + final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); connectPorts(distributor.getNewOutputPort(), wc.getInputPort());