diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 9f6ca96b0009a5bbae9f15857474d9b70790830e..254aad95e30c0b98ed425a462b453e1b9b38830b 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -63,6 +63,9 @@ public class Merger<T> extends AbstractStage { private final IMergerStrategy strategy; + /** + * A merger using the {@link RoundRobinStrategy}}. + */ public Merger() { this(new RoundRobinStrategy()); } diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index 6d09ac90ad4d8efc21cf6eb65259ad704b4fd87a..d278c659515eb9f751d7b31e503898f7d9600408 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -19,10 +19,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.util.Arrays; + import org.junit.Before; import org.junit.Test; -import teetime.stage.Counter; import teetime.stage.InitialElementProducer; import teetime.stage.basic.Sink; import teetime.testutil.AssertHelper; @@ -30,10 +31,11 @@ import teetime.testutil.AssertHelper; /** * Tests whether * <ul> - * <li>setting a stage active within a composite stage works - * <li>creating a producer within a composite state works - * <li>creating and connecting two stages within a composite stage works - * <li>flattening of a composite stage works + * <li>creating and connecting two stages within a composite stage works ({@link #ensureFlatteningDepth1AtRuntime()}) + * <li>flattening of a composite stage works ({@link #ensureFlatteningDepth1AtRuntime()}) + * <li>different levels of composite stages work ({@link #ensureFlatteningDepth2AtRuntime()}) + * <li>setting a stage active within a composite stage works ({@link #ensureWorkingCompositeStageWithInternalProducerAndActiveMerger()}) + * <li>creating a producer within a composite state works ({@link #ensureWorkingCompositeStageWithInternalProducerAndActiveMerger()}) * </ul> * * @author Christian Wulf @@ -46,12 +48,6 @@ public class AbstractCompositeStageTest { AbstractStage.clearInstanceCounters(); // resets the id to zero } - @Test - public void ensureCorrectNumberOfActiveStages() { - Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf()); - assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); - } - @Test public void ensureFlatteningDepth1AtRuntime() { CounterContainer element = new CounterContainer(); @@ -79,6 +75,14 @@ public class AbstractCompositeStageTest { assertLastStage(stage); } + @Test + public void ensureWorkingCompositeStageWithInternalProducerAndActiveMerger() { + Execution<CompositeProducerConfig> execution = new Execution<CompositeProducerConfig>(new CompositeProducerConfig()); + execution.executeBlocking(); + + assertThat(execution.getConfiguration().getResultElements(), is(Arrays.asList(5, 0, 6, 1, 7, 2, 8, 3, 9, 4))); + } + private InitialElementProducer<CounterContainer> assertFirstStage(final Execution<CompositeCounterPipelineConfig> execution) { InitialElementProducer<CounterContainer> producer = execution.getConfiguration().getProducer(); assertThat(producer.getId(), is(equalTo("InitialElementProducer-0"))); @@ -105,56 +109,4 @@ public class AbstractCompositeStageTest { Sink<?> sink = AssertHelper.assertInstanceOf(Sink.class, nextStage); assertThat(sink.getId(), is(equalTo("Sink-0"))); } - - private class NestedConf extends Configuration { - - private final InitialElementProducer<Object> init; - private final Sink<Object> sink; - private final TestNestingCompositeStage compositeStage; - - public NestedConf() { - init = new InitialElementProducer<Object>(new Object()); - sink = new Sink<Object>(); - compositeStage = new TestNestingCompositeStage(); - connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort()); - connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort()); - - } - } - - private class TestCompositeOneStage extends AbstractCompositeStage { - - private final Counter firstCounter = new Counter(); - - public TestCompositeOneStage() { - firstCounter.declareActive(); - } - - } - - private class TestCompositeTwoStage extends AbstractCompositeStage { - - private final Counter firstCounter = new Counter(); - private final Counter secondCounter = new Counter(); - - public TestCompositeTwoStage() { - firstCounter.declareActive(); - connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); - } - - } - - private class TestNestingCompositeStage extends AbstractCompositeStage { - - public TestCompositeOneStage firstCompositeStage; - public TestCompositeTwoStage secondCompositeStage; - - public TestNestingCompositeStage() { - firstCompositeStage = new TestCompositeOneStage(); - secondCompositeStage = new TestCompositeTwoStage(); - connectPorts(firstCompositeStage.firstCounter.getOutputPort(), secondCompositeStage.firstCounter.getInputPort()); - } - - } - } diff --git a/src/test/java/teetime/framework/CompositeCounterIncrementer.java b/src/test/java/teetime/framework/CompositeCounterIncrementer.java index 4e36467efb6e32ad871355d5a34b6918ffa4275c..68a3e71e61ae4649bb06a229e48b7959549b12ba 100644 --- a/src/test/java/teetime/framework/CompositeCounterIncrementer.java +++ b/src/test/java/teetime/framework/CompositeCounterIncrementer.java @@ -19,7 +19,6 @@ class CompositeCounterIncrementer extends AbstractCompositeStage { lastStageInputPort = lastStage.getInputPort(); outputPort = lastStage.getOutputPort(); } else { - // NoopFilter<CounterContainer> lastStage = new NoopFilter<CounterContainer>(); CounterIncrementer lastStage = incrementer; lastStageInputPort = lastStage.getInputPort(); outputPort = lastStage.getOutputPort(); diff --git a/src/test/java/teetime/framework/CompositeProducerConfig.java b/src/test/java/teetime/framework/CompositeProducerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..16ffb163d736846c93f7e4a8c5ce4b28bedf07b6 --- /dev/null +++ b/src/test/java/teetime/framework/CompositeProducerConfig.java @@ -0,0 +1,24 @@ +package teetime.framework; + +import java.util.List; + +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; + +class CompositeProducerConfig extends Configuration { + + private final CollectorSink<Integer> sink; + + public CompositeProducerConfig() { + InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(0, 1, 2, 3, 4); + CompositeProducerStage<Integer> compositeProducerStage = new CompositeProducerStage<Integer>(5, 6, 7, 8, 9); + sink = new CollectorSink<Integer>(); + + connectPorts(initialElementProducer.getOutputPort(), compositeProducerStage.getInputPort()); + connectPorts(compositeProducerStage.getOutputPort(), sink.getInputPort()); + } + + List<Integer> getResultElements() { + return sink.getElements(); + } +} diff --git a/src/test/java/teetime/framework/CompositeProducerStage.java b/src/test/java/teetime/framework/CompositeProducerStage.java new file mode 100644 index 0000000000000000000000000000000000000000..6f965b407943973955751a04b233a3bc746a18ff --- /dev/null +++ b/src/test/java/teetime/framework/CompositeProducerStage.java @@ -0,0 +1,34 @@ +package teetime.framework; + +import teetime.stage.InitialElementProducer; +import teetime.stage.basic.merger.Merger; + +/** + * A composite stage that contains a producer and an active merger. + * + * @author Christian Wulf + * + * @param <T> + * the element type of the producer + */ +class CompositeProducerStage<T> extends AbstractCompositeStage { + + private final Merger<T> merger; + + public CompositeProducerStage(final T... elements) { + InitialElementProducer<T> producer = new InitialElementProducer<T>(elements); + merger = new Merger<T>(); + + connectPorts(producer.getOutputPort(), merger.getNewInputPort()); + + merger.declareActive(); + } + + InputPort<T> getInputPort() { + return merger.getNewInputPort(); + } + + OutputPort<T> getOutputPort() { + return merger.getOutputPort(); + } +}