Skip to content
Snippets Groups Projects
Commit fefbc35c authored by Christian Wulf's avatar Christian Wulf
Browse files

closes #167

parent beb2c6de
No related branches found
No related tags found
No related merge requests found
......@@ -63,6 +63,9 @@ public class Merger<T> extends AbstractStage {
private final IMergerStrategy strategy;
/**
* A merger using the {@link RoundRobinStrategy}}.
*/
public Merger() {
this(new RoundRobinStrategy());
}
......
......@@ -19,10 +19,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import teetime.stage.Counter;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.Sink;
import teetime.testutil.AssertHelper;
......@@ -30,10 +31,11 @@ import teetime.testutil.AssertHelper;
/**
* Tests whether
* <ul>
* <li>setting a stage active within a composite stage works
* <li>creating a producer within a composite state works
* <li>creating and connecting two stages within a composite stage works
* <li>flattening of a composite stage works
* <li>creating and connecting two stages within a composite stage works ({@link #ensureFlatteningDepth1AtRuntime()})
* <li>flattening of a composite stage works ({@link #ensureFlatteningDepth1AtRuntime()})
* <li>different levels of composite stages work ({@link #ensureFlatteningDepth2AtRuntime()})
* <li>setting a stage active within a composite stage works ({@link #ensureWorkingCompositeStageWithInternalProducerAndActiveMerger()})
* <li>creating a producer within a composite state works ({@link #ensureWorkingCompositeStageWithInternalProducerAndActiveMerger()})
* </ul>
*
* @author Christian Wulf
......@@ -46,12 +48,6 @@ public class AbstractCompositeStageTest {
AbstractStage.clearInstanceCounters(); // resets the id to zero
}
@Test
public void ensureCorrectNumberOfActiveStages() {
Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf());
assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3));
}
@Test
public void ensureFlatteningDepth1AtRuntime() {
CounterContainer element = new CounterContainer();
......@@ -79,6 +75,14 @@ public class AbstractCompositeStageTest {
assertLastStage(stage);
}
@Test
public void ensureWorkingCompositeStageWithInternalProducerAndActiveMerger() {
Execution<CompositeProducerConfig> execution = new Execution<CompositeProducerConfig>(new CompositeProducerConfig());
execution.executeBlocking();
assertThat(execution.getConfiguration().getResultElements(), is(Arrays.asList(5, 0, 6, 1, 7, 2, 8, 3, 9, 4)));
}
private InitialElementProducer<CounterContainer> assertFirstStage(final Execution<CompositeCounterPipelineConfig> execution) {
InitialElementProducer<CounterContainer> producer = execution.getConfiguration().getProducer();
assertThat(producer.getId(), is(equalTo("InitialElementProducer-0")));
......@@ -105,56 +109,4 @@ public class AbstractCompositeStageTest {
Sink<?> sink = AssertHelper.assertInstanceOf(Sink.class, nextStage);
assertThat(sink.getId(), is(equalTo("Sink-0")));
}
private class NestedConf extends Configuration {
private final InitialElementProducer<Object> init;
private final Sink<Object> sink;
private final TestNestingCompositeStage compositeStage;
public NestedConf() {
init = new InitialElementProducer<Object>(new Object());
sink = new Sink<Object>();
compositeStage = new TestNestingCompositeStage();
connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort());
connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort());
}
}
private class TestCompositeOneStage extends AbstractCompositeStage {
private final Counter firstCounter = new Counter();
public TestCompositeOneStage() {
firstCounter.declareActive();
}
}
private class TestCompositeTwoStage extends AbstractCompositeStage {
private final Counter firstCounter = new Counter();
private final Counter secondCounter = new Counter();
public TestCompositeTwoStage() {
firstCounter.declareActive();
connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort());
}
}
private class TestNestingCompositeStage extends AbstractCompositeStage {
public TestCompositeOneStage firstCompositeStage;
public TestCompositeTwoStage secondCompositeStage;
public TestNestingCompositeStage() {
firstCompositeStage = new TestCompositeOneStage();
secondCompositeStage = new TestCompositeTwoStage();
connectPorts(firstCompositeStage.firstCounter.getOutputPort(), secondCompositeStage.firstCounter.getInputPort());
}
}
}
......@@ -19,7 +19,6 @@ class CompositeCounterIncrementer extends AbstractCompositeStage {
lastStageInputPort = lastStage.getInputPort();
outputPort = lastStage.getOutputPort();
} else {
// NoopFilter<CounterContainer> lastStage = new NoopFilter<CounterContainer>();
CounterIncrementer lastStage = incrementer;
lastStageInputPort = lastStage.getInputPort();
outputPort = lastStage.getOutputPort();
......
package teetime.framework;
import java.util.List;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
class CompositeProducerConfig extends Configuration {
private final CollectorSink<Integer> sink;
public CompositeProducerConfig() {
InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(0, 1, 2, 3, 4);
CompositeProducerStage<Integer> compositeProducerStage = new CompositeProducerStage<Integer>(5, 6, 7, 8, 9);
sink = new CollectorSink<Integer>();
connectPorts(initialElementProducer.getOutputPort(), compositeProducerStage.getInputPort());
connectPorts(compositeProducerStage.getOutputPort(), sink.getInputPort());
}
List<Integer> getResultElements() {
return sink.getElements();
}
}
package teetime.framework;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.merger.Merger;
/**
* A composite stage that contains a producer and an active merger.
*
* @author Christian Wulf
*
* @param <T>
* the element type of the producer
*/
class CompositeProducerStage<T> extends AbstractCompositeStage {
private final Merger<T> merger;
public CompositeProducerStage(final T... elements) {
InitialElementProducer<T> producer = new InitialElementProducer<T>(elements);
merger = new Merger<T>();
connectPorts(producer.getOutputPort(), merger.getNewInputPort());
merger.declareActive();
}
InputPort<T> getInputPort() {
return merger.getNewInputPort();
}
OutputPort<T> getOutputPort() {
return merger.getOutputPort();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment