diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index e5628e6e071c25fa519893b1392ab3d4e1b24bcc..6d09ac90ad4d8efc21cf6eb65259ad704b4fd87a 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -15,20 +15,95 @@ */ package teetime.framework; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Before; import org.junit.Test; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; import teetime.stage.basic.Sink; +import teetime.testutil.AssertHelper; +/** + * Tests whether + * <ul> + * <li>setting a stage active within a composite stage works + * <li>creating a producer within a composite state works + * <li>creating and connecting two stages within a composite stage works + * <li>flattening of a composite stage works + * </ul> + * + * @author Christian Wulf + * + */ public class AbstractCompositeStageTest { + @Before + public void before() { + AbstractStage.clearInstanceCounters(); // resets the id to zero + } + @Test - public void testNestedStages() { + public void ensureCorrectNumberOfActiveStages() { Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf()); - assertEquals(exec.getConfiguration().getContext().getThreadableStages().size(), 3); + assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); + } + + @Test + public void ensureFlatteningDepth1AtRuntime() { + CounterContainer element = new CounterContainer(); + Execution<CompositeCounterPipelineConfig> execution = new Execution<CompositeCounterPipelineConfig>(new CompositeCounterPipelineConfig(1, element)); + + InitialElementProducer<CounterContainer> producer; + CounterIncrementer stage; + + producer = assertFirstStage(execution); + stage = assertSecondStage(producer); + assertLastStage(stage); + } + + @Test + public void ensureFlatteningDepth2AtRuntime() { + CounterContainer element = new CounterContainer(); + Execution<CompositeCounterPipelineConfig> execution = new Execution<CompositeCounterPipelineConfig>(new CompositeCounterPipelineConfig(2, element)); + + InitialElementProducer<CounterContainer> producer; + CounterIncrementer stage; + + producer = assertFirstStage(execution); + stage = assertSecondStage(producer); + stage = assertThirdStage(stage); + assertLastStage(stage); + } + + private InitialElementProducer<CounterContainer> assertFirstStage(final Execution<CompositeCounterPipelineConfig> execution) { + InitialElementProducer<CounterContainer> producer = execution.getConfiguration().getProducer(); + assertThat(producer.getId(), is(equalTo("InitialElementProducer-0"))); + return producer; + } + + private CounterIncrementer assertSecondStage(final InitialElementProducer<CounterContainer> producer) { + AbstractStage nextStage = producer.getOutputPort().getPipe().getTargetPort().getOwningStage(); + CounterIncrementer stage = AssertHelper.assertInstanceOf(CounterIncrementer.class, nextStage); + assertThat(stage.getId(), is(equalTo("CounterIncrementer-0"))); + return stage; + } + + private CounterIncrementer assertThirdStage(CounterIncrementer stage) { + AbstractStage nextStage = stage.getOutputPort().getPipe().getTargetPort().getOwningStage(); + stage = AssertHelper.assertInstanceOf(CounterIncrementer.class, nextStage); + assertThat(stage.getId(), is(equalTo("CounterIncrementer-1"))); + return stage; + } + + private void assertLastStage(final CounterIncrementer stage) { + AbstractStage nextStage; + nextStage = stage.getOutputPort().getPipe().getTargetPort().getOwningStage(); + Sink<?> sink = AssertHelper.assertInstanceOf(Sink.class, nextStage); + assertThat(sink.getId(), is(equalTo("Sink-0"))); } private class NestedConf extends Configuration { diff --git a/src/test/java/teetime/framework/CompositeCounterIncrementer.java b/src/test/java/teetime/framework/CompositeCounterIncrementer.java new file mode 100644 index 0000000000000000000000000000000000000000..4e36467efb6e32ad871355d5a34b6918ffa4275c --- /dev/null +++ b/src/test/java/teetime/framework/CompositeCounterIncrementer.java @@ -0,0 +1,39 @@ +package teetime.framework; + +class CompositeCounterIncrementer extends AbstractCompositeStage { + + private final InputPort<CounterContainer> inputPort; + private final OutputPort<CounterContainer> outputPort; + + public CompositeCounterIncrementer(final int depth) { + if (depth <= 0) { // one counter incrementer is always created + throw new IllegalArgumentException(); + } + + CounterIncrementer incrementer = new CounterIncrementer(); + this.inputPort = incrementer.getInputPort(); + + InputPort<CounterContainer> lastStageInputPort; + if (depth > 1) { + CompositeCounterIncrementer lastStage = new CompositeCounterIncrementer(depth - 1); + lastStageInputPort = lastStage.getInputPort(); + outputPort = lastStage.getOutputPort(); + } else { + // NoopFilter<CounterContainer> lastStage = new NoopFilter<CounterContainer>(); + CounterIncrementer lastStage = incrementer; + lastStageInputPort = lastStage.getInputPort(); + outputPort = lastStage.getOutputPort(); + } + + connectPorts(incrementer.getOutputPort(), lastStageInputPort); + } + + public InputPort<CounterContainer> getInputPort() { + return inputPort; + } + + public OutputPort<CounterContainer> getOutputPort() { + return outputPort; + } + +} diff --git a/src/test/java/teetime/framework/CompositeCounterPipelineConfig.java b/src/test/java/teetime/framework/CompositeCounterPipelineConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..7d7e631342ef8920bad6555fe2e8b4dc2643cb72 --- /dev/null +++ b/src/test/java/teetime/framework/CompositeCounterPipelineConfig.java @@ -0,0 +1,26 @@ +package teetime.framework; + +import teetime.stage.InitialElementProducer; +import teetime.stage.basic.Sink; + +class CompositeCounterPipelineConfig extends Configuration { + + private final InitialElementProducer<CounterContainer> producer; + + public CompositeCounterPipelineConfig(final int numMaxCounterIncrementors, final CounterContainer element) { + if (element == null) { + throw new IllegalArgumentException("element may not be null"); + } + + producer = new InitialElementProducer<CounterContainer>(element); + CompositeCounterIncrementer compositeCounterIncrementor = new CompositeCounterIncrementer(numMaxCounterIncrementors); + Sink<CounterContainer> sink = new Sink<CounterContainer>(); + + connectPorts(producer.getOutputPort(), compositeCounterIncrementor.getInputPort()); + connectPorts(compositeCounterIncrementor.getOutputPort(), sink.getInputPort()); + } + + public InitialElementProducer<CounterContainer> getProducer() { + return producer; + } +} diff --git a/src/test/java/teetime/framework/CounterContainer.java b/src/test/java/teetime/framework/CounterContainer.java new file mode 100644 index 0000000000000000000000000000000000000000..8b523befe1a596b4c1a1678ef0f9bd9d7fc971a6 --- /dev/null +++ b/src/test/java/teetime/framework/CounterContainer.java @@ -0,0 +1,25 @@ +package teetime.framework; + +/** + * Represents a counter which does not need to be boxed and unboxed each time it is accessed by a stage. + * + * @author Christian Wulf + * + */ +class CounterContainer { + + private long counter; + + public String inc() { + counter++; + return ""; + } + + public long getCounter() { + return counter; + } + + public void reset() { + counter = 0; + } +} diff --git a/src/test/java/teetime/framework/CounterIncrementer.java b/src/test/java/teetime/framework/CounterIncrementer.java new file mode 100644 index 0000000000000000000000000000000000000000..41dab87762f8ab77dc6d13cdff2eda9e9f1c4ef9 --- /dev/null +++ b/src/test/java/teetime/framework/CounterIncrementer.java @@ -0,0 +1,13 @@ +package teetime.framework; + +import teetime.stage.basic.AbstractFilter; + +class CounterIncrementer extends AbstractFilter<CounterContainer> { + + @Override + protected void execute(final CounterContainer element) { + element.inc(); + getOutputPort().send(element); + } + +} diff --git a/src/test/java/teetime/testutil/AssertHelper.java b/src/test/java/teetime/testutil/AssertHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..bc8878163ea6e6e85ba6b7df7a199dc563631dd3 --- /dev/null +++ b/src/test/java/teetime/testutil/AssertHelper.java @@ -0,0 +1,18 @@ +package teetime.testutil; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +public final class AssertHelper { + + private AssertHelper() { + // utility class + } + + @SuppressWarnings("unchecked") + public static <S, T extends S> T assertInstanceOf(final Class<T> expectedClazz, final S object) { + assertThat(object, is(instanceOf(expectedClazz))); + return (T) object; + } +}