Skip to content
Snippets Groups Projects
Commit 03e71d16 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'master' into signal-fix

parents b273388e 5516cbc0
No related branches found
No related tags found
No related merge requests found
Showing
with 284 additions and 108 deletions
...@@ -19,13 +19,13 @@ import teetime.framework.OutputPort; ...@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public final class CloneStrategy<T> implements IDistributorStrategy<T> { public final class CloneStrategy implements IDistributorStrategy {
@Override @Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
......
...@@ -19,13 +19,13 @@ import teetime.framework.OutputPort; ...@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> { public final class CopyByReferenceStrategy implements IDistributorStrategy {
@Override @Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
for (final OutputPort<T> outputPort : outputPorts) { for (final OutputPort<T> outputPort : outputPorts) {
outputPort.send(element); outputPort.send(element);
} }
......
...@@ -29,7 +29,15 @@ import teetime.framework.OutputPort; ...@@ -29,7 +29,15 @@ import teetime.framework.OutputPort;
*/ */
public class Distributor<T> extends AbstractConsumerStage<T> { public class Distributor<T> extends AbstractConsumerStage<T> {
private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>(); private IDistributorStrategy strategy;
public Distributor() {
this(new RoundRobinStrategy());
}
public Distributor(final IDistributorStrategy strategy) {
this.strategy = strategy;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
...@@ -41,11 +49,11 @@ public class Distributor<T> extends AbstractConsumerStage<T> { ...@@ -41,11 +49,11 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
return this.createOutputPort(); return this.createOutputPort();
} }
public IDistributorStrategy<T> getStrategy() { public IDistributorStrategy getStrategy() {
return this.strategy; return this.strategy;
} }
public void setStrategy(final IDistributorStrategy<T> strategy) { public void setStrategy(final IDistributorStrategy strategy) {
this.strategy = strategy; this.strategy = strategy;
} }
......
...@@ -19,11 +19,11 @@ import teetime.framework.OutputPort; ...@@ -19,11 +19,11 @@ import teetime.framework.OutputPort;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public interface IDistributorStrategy<T> { public interface IDistributorStrategy {
public boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
} }
...@@ -19,15 +19,15 @@ import teetime.framework.OutputPort; ...@@ -19,15 +19,15 @@ import teetime.framework.OutputPort;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { public final class RoundRobinStrategy implements IDistributorStrategy {
private int index = 0; private int index = 0;
@Override @Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
outputPort.send(element); outputPort.send(element);
...@@ -35,7 +35,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { ...@@ -35,7 +35,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
return true; return true;
} }
private OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
final OutputPort<T> outputPort = outputPorts[this.index]; final OutputPort<T> outputPort = outputPorts[this.index];
this.index = (this.index + 1) % outputPorts.length; this.index = (this.index + 1) % outputPorts.length;
......
...@@ -17,11 +17,11 @@ package teetime.stage.basic.merger; ...@@ -17,11 +17,11 @@ package teetime.stage.basic.merger;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public interface IMergerStrategy<T> { public interface IMergerStrategy {
public T getNextInput(Merger<T> merger); public <T> T getNextInput(Merger<T> merger);
} }
...@@ -42,10 +42,18 @@ public final class Merger<T> extends AbstractStage { ...@@ -42,10 +42,18 @@ public final class Merger<T> extends AbstractStage {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); private IMergerStrategy strategy;
private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>(); private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>();
public Merger() {
this(new RoundRobinStrategy());
}
public Merger(final IMergerStrategy strategy) {
this.strategy = strategy;
}
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
final T token = this.strategy.getNextInput(this); final T token = this.strategy.getNextInput(this);
...@@ -90,11 +98,11 @@ public final class Merger<T> extends AbstractStage { ...@@ -90,11 +98,11 @@ public final class Merger<T> extends AbstractStage {
} }
public IMergerStrategy<T> getMergerStrategy() { public IMergerStrategy getMergerStrategy() {
return this.strategy; return this.strategy;
} }
public void setStrategy(final IMergerStrategy<T> strategy) { public void setStrategy(final IMergerStrategy strategy) {
this.strategy = strategy; this.strategy = strategy;
} }
......
...@@ -19,15 +19,15 @@ import teetime.framework.InputPort; ...@@ -19,15 +19,15 @@ import teetime.framework.InputPort;
/** /**
* @author Nils Christian Ehmke * @author Nils Christian Ehmke
* *
* @since 1.10 * @since 1.0
*/ */
public final class RoundRobinStrategy<T> implements IMergerStrategy<T> { public final class RoundRobinStrategy implements IMergerStrategy {
private int index = 0; private int index = 0;
@Override @Override
public T getNextInput(final Merger<T> merger) { public <T> T getNextInput(final Merger<T> merger) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts(); InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts();
int size = inputPorts.length; int size = inputPorts.length;
...@@ -42,7 +42,7 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> { ...@@ -42,7 +42,7 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
return null; return null;
} }
private InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) { private <T> InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) {
InputPort<T> inputPort = inputPorts[this.index]; InputPort<T> inputPort = inputPorts[this.index];
this.index = (this.index + 1) % inputPorts.length; this.index = (this.index + 1) % inputPorts.length;
......
...@@ -29,7 +29,7 @@ public final class EveryXthPrinter<T> extends Stage { ...@@ -29,7 +29,7 @@ public final class EveryXthPrinter<T> extends Stage {
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
distributor.setStrategy(new CopyByReferenceStrategy<T>()); distributor.setStrategy(new CopyByReferenceStrategy());
} }
@Override @Override
......
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<browserconfig> <browserconfig>
<msapplication> <msapplication>
<tile> <tile>
<square70x70logo src="/mstile-70x70.png"/> <square70x70logo src="/mstile-70x70.png"/>
<square150x150logo src="/mstile-150x150.png"/> <square150x150logo src="/mstile-150x150.png"/>
<square310x310logo src="/mstile-310x310.png"/> <square310x310logo src="/mstile-310x310.png"/>
<wide310x150logo src="/mstile-310x150.png"/> <wide310x150logo src="/mstile-310x150.png"/>
<TileColor>#2b5797</TileColor> <TileColor>#2b5797</TileColor>
</tile> </tile>
</msapplication> </msapplication>
</browserconfig> </browserconfig>
No preview for this file type
package teetime.stage.basic.distributor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertThat;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.stage.CollectorSink;
/**
* @author Nils Christian Ehmke
*
* @since 1.0
*/
public class DistributorTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
private Distributor<Integer> distributorUnderTest;
private CollectorSink<Integer> fstCollector;
private CollectorSink<Integer> sndCollector;
@Before
public void initializeDistributor() throws Exception {
this.distributorUnderTest = new Distributor<Integer>();
this.fstCollector = new CollectorSink<Integer>();
this.sndCollector = new CollectorSink<Integer>();
final IPipeFactory pipeFactory = new SingleElementPipeFactory();
pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.fstCollector.getInputPort());
pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.sndCollector.getInputPort());
distributorUnderTest.onStarting();
}
@Test
public void roundRobinShouldWork() {
distributorUnderTest.setStrategy(new RoundRobinStrategy());
this.distributorUnderTest.execute(1);
this.distributorUnderTest.execute(2);
this.distributorUnderTest.execute(3);
this.distributorUnderTest.execute(4);
this.distributorUnderTest.execute(5);
assertThat(this.fstCollector.getElements(), contains(1, 3, 5));
assertThat(this.sndCollector.getElements(), contains(2, 4));
}
@Test
public void singleElementRoundRobinShouldWork() {
distributorUnderTest.setStrategy(new RoundRobinStrategy());
this.distributorUnderTest.execute(1);
assertThat(this.fstCollector.getElements(), contains(1));
assertThat(this.sndCollector.getElements(), is(empty()));
}
@Test
public void copyByReferenceShouldWork() {
distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
this.distributorUnderTest.execute(1);
this.distributorUnderTest.execute(2);
this.distributorUnderTest.execute(3);
this.distributorUnderTest.execute(4);
this.distributorUnderTest.execute(5);
assertThat(this.fstCollector.getElements(), contains(1, 2, 3, 4, 5));
assertThat(this.sndCollector.getElements(), contains(1, 2, 3, 4, 5));
}
@Test
public void singleElementCopyByReferenceShouldWork() {
distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
this.distributorUnderTest.execute(1);
assertThat(this.fstCollector.getElements(), contains(1));
assertThat(this.sndCollector.getElements(), contains(1));
}
@Test
public void cloneShouldNotWork() {
distributorUnderTest.setStrategy(new CloneStrategy());
expectedException.expect(UnsupportedOperationException.class);
this.distributorUnderTest.execute(1);
}
}
package teetime.stage.basic.merger;
import org.junit.Assert;
import org.junit.Test;
import teetime.framework.InputPort;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public class MergerSignalTest {
private Merger<Integer> merger;
private InputPort<Integer> firstPort;
private InputPort<Integer> secondPort;
private MergerTestingPipe testPipe;
public void beforeSignalTesting() {
merger = new Merger<Integer>();
firstPort = merger.getNewInputPort();
secondPort = merger.getNewInputPort();
testPipe = new MergerTestingPipe();
merger.getOutputPort().setPipe(testPipe);
}
@Test
public void testSameSignal() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
}
@Test
public void testDifferentSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new TerminatingSignal(), secondPort);
Assert.assertFalse(testPipe.startSent());
}
@Test
public void testInterleavedSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), secondPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
}
@Test
public void testMultipleSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
}
}
package teetime.stage.basic.merger; package teetime.stage.basic.merger;
import org.junit.Assert; import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import teetime.framework.InputPort; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.signal.StartingSignal; import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.signal.TerminatingSignal; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
/**
* @author Nils Christian Ehmke
*
* @since 1.0
*/
public class MergerTest { public class MergerTest {
private Merger<Integer> merger; private Merger<Integer> mergerUnderTest;
private InputPort<Integer> firstPort; private CollectorSink<Integer> collector;
private InputPort<Integer> secondPort; private InitialElementProducer<Integer> fstProducer;
private MergerTestingPipe testPipe; private InitialElementProducer<Integer> sndProducer;
public void beforeSignalTesting() { @Before
merger = new Merger<Integer>(); public void initializeMerger() throws Exception {
this.mergerUnderTest = new Merger<Integer>();
this.collector = new CollectorSink<Integer>();
this.fstProducer = new InitialElementProducer<Integer>(1, 2, 3);
this.sndProducer = new InitialElementProducer<Integer>(4, 5, 6);
firstPort = merger.getNewInputPort(); final IPipeFactory pipeFactory = new SingleElementPipeFactory();
secondPort = merger.getNewInputPort(); pipeFactory.create(this.fstProducer.getOutputPort(), this.mergerUnderTest.getNewInputPort());
pipeFactory.create(this.sndProducer.getOutputPort(), this.mergerUnderTest.getNewInputPort());
pipeFactory.create(this.mergerUnderTest.getOutputPort(), this.collector.getInputPort());
testPipe = new MergerTestingPipe(); mergerUnderTest.onStarting();
merger.getOutputPort().setPipe(testPipe);
} }
@Test @Test
public void testSameSignal() { public void roundRobinShouldWork() {
this.beforeSignalTesting(); mergerUnderTest.setStrategy(new RoundRobinStrategy());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
}
@Test this.fstProducer.executeWithPorts();
public void testDifferentSignals() { this.sndProducer.executeWithPorts();
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new TerminatingSignal(), secondPort); assertThat(this.collector.getElements(), contains(1, 2, 3, 4, 5, 6));
Assert.assertFalse(testPipe.startSent());
} }
@Test @Test
public void testInterleavedSignals() { public void roundRobinWithSingleProducerShouldWork() {
this.beforeSignalTesting(); mergerUnderTest.setStrategy(new RoundRobinStrategy());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), secondPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
merger.onSignal(new TerminatingSignal(), firstPort); this.fstProducer.executeWithPorts();
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
merger.onSignal(new StartingSignal(), secondPort); assertThat(this.collector.getElements(), contains(1, 2, 3));
Assert.assertTrue(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
} }
@Test
public void testMultipleSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment