Skip to content
Snippets Groups Projects
Commit beb2c6de authored by Christian Wulf's avatar Christian Wulf
Browse files

added flatten test for composite stages (#167)

parent 312eb100
No related branches found
No related tags found
No related merge requests found
......@@ -15,20 +15,95 @@
*/
package teetime.framework;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import org.junit.Before;
import org.junit.Test;
import teetime.stage.Counter;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.Sink;
import teetime.testutil.AssertHelper;
/**
* Tests whether
* <ul>
* <li>setting a stage active within a composite stage works
* <li>creating a producer within a composite state works
* <li>creating and connecting two stages within a composite stage works
* <li>flattening of a composite stage works
* </ul>
*
* @author Christian Wulf
*
*/
public class AbstractCompositeStageTest {
@Before
public void before() {
AbstractStage.clearInstanceCounters(); // resets the id to zero
}
@Test
public void testNestedStages() {
public void ensureCorrectNumberOfActiveStages() {
Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf());
assertEquals(exec.getConfiguration().getContext().getThreadableStages().size(), 3);
assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3));
}
@Test
public void ensureFlatteningDepth1AtRuntime() {
CounterContainer element = new CounterContainer();
Execution<CompositeCounterPipelineConfig> execution = new Execution<CompositeCounterPipelineConfig>(new CompositeCounterPipelineConfig(1, element));
InitialElementProducer<CounterContainer> producer;
CounterIncrementer stage;
producer = assertFirstStage(execution);
stage = assertSecondStage(producer);
assertLastStage(stage);
}
@Test
public void ensureFlatteningDepth2AtRuntime() {
CounterContainer element = new CounterContainer();
Execution<CompositeCounterPipelineConfig> execution = new Execution<CompositeCounterPipelineConfig>(new CompositeCounterPipelineConfig(2, element));
InitialElementProducer<CounterContainer> producer;
CounterIncrementer stage;
producer = assertFirstStage(execution);
stage = assertSecondStage(producer);
stage = assertThirdStage(stage);
assertLastStage(stage);
}
private InitialElementProducer<CounterContainer> assertFirstStage(final Execution<CompositeCounterPipelineConfig> execution) {
InitialElementProducer<CounterContainer> producer = execution.getConfiguration().getProducer();
assertThat(producer.getId(), is(equalTo("InitialElementProducer-0")));
return producer;
}
private CounterIncrementer assertSecondStage(final InitialElementProducer<CounterContainer> producer) {
AbstractStage nextStage = producer.getOutputPort().getPipe().getTargetPort().getOwningStage();
CounterIncrementer stage = AssertHelper.assertInstanceOf(CounterIncrementer.class, nextStage);
assertThat(stage.getId(), is(equalTo("CounterIncrementer-0")));
return stage;
}
private CounterIncrementer assertThirdStage(CounterIncrementer stage) {
AbstractStage nextStage = stage.getOutputPort().getPipe().getTargetPort().getOwningStage();
stage = AssertHelper.assertInstanceOf(CounterIncrementer.class, nextStage);
assertThat(stage.getId(), is(equalTo("CounterIncrementer-1")));
return stage;
}
private void assertLastStage(final CounterIncrementer stage) {
AbstractStage nextStage;
nextStage = stage.getOutputPort().getPipe().getTargetPort().getOwningStage();
Sink<?> sink = AssertHelper.assertInstanceOf(Sink.class, nextStage);
assertThat(sink.getId(), is(equalTo("Sink-0")));
}
private class NestedConf extends Configuration {
......
package teetime.framework;
class CompositeCounterIncrementer extends AbstractCompositeStage {
private final InputPort<CounterContainer> inputPort;
private final OutputPort<CounterContainer> outputPort;
public CompositeCounterIncrementer(final int depth) {
if (depth <= 0) { // one counter incrementer is always created
throw new IllegalArgumentException();
}
CounterIncrementer incrementer = new CounterIncrementer();
this.inputPort = incrementer.getInputPort();
InputPort<CounterContainer> lastStageInputPort;
if (depth > 1) {
CompositeCounterIncrementer lastStage = new CompositeCounterIncrementer(depth - 1);
lastStageInputPort = lastStage.getInputPort();
outputPort = lastStage.getOutputPort();
} else {
// NoopFilter<CounterContainer> lastStage = new NoopFilter<CounterContainer>();
CounterIncrementer lastStage = incrementer;
lastStageInputPort = lastStage.getInputPort();
outputPort = lastStage.getOutputPort();
}
connectPorts(incrementer.getOutputPort(), lastStageInputPort);
}
public InputPort<CounterContainer> getInputPort() {
return inputPort;
}
public OutputPort<CounterContainer> getOutputPort() {
return outputPort;
}
}
package teetime.framework;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.Sink;
class CompositeCounterPipelineConfig extends Configuration {
private final InitialElementProducer<CounterContainer> producer;
public CompositeCounterPipelineConfig(final int numMaxCounterIncrementors, final CounterContainer element) {
if (element == null) {
throw new IllegalArgumentException("element may not be null");
}
producer = new InitialElementProducer<CounterContainer>(element);
CompositeCounterIncrementer compositeCounterIncrementor = new CompositeCounterIncrementer(numMaxCounterIncrementors);
Sink<CounterContainer> sink = new Sink<CounterContainer>();
connectPorts(producer.getOutputPort(), compositeCounterIncrementor.getInputPort());
connectPorts(compositeCounterIncrementor.getOutputPort(), sink.getInputPort());
}
public InitialElementProducer<CounterContainer> getProducer() {
return producer;
}
}
package teetime.framework;
/**
* Represents a counter which does not need to be boxed and unboxed each time it is accessed by a stage.
*
* @author Christian Wulf
*
*/
class CounterContainer {
private long counter;
public String inc() {
counter++;
return "";
}
public long getCounter() {
return counter;
}
public void reset() {
counter = 0;
}
}
package teetime.framework;
import teetime.stage.basic.AbstractFilter;
class CounterIncrementer extends AbstractFilter<CounterContainer> {
@Override
protected void execute(final CounterContainer element) {
element.inc();
getOutputPort().send(element);
}
}
package teetime.testutil;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;
public final class AssertHelper {
private AssertHelper() {
// utility class
}
@SuppressWarnings("unchecked")
public static <S, T extends S> T assertInstanceOf(final Class<T> expectedClazz, final S object) {
assertThat(object, is(instanceOf(expectedClazz)));
return (T) object;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment