diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 0634134c27045328a0abfdad894dd828c26310c8..48100e5c187408fd931d7f5fe7d78ffad97ec0bd 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -16,6 +16,9 @@ package teetime.stage.basic.merger; +import java.util.HashMap; +import java.util.Map; + import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -27,10 +30,10 @@ import teetime.framework.signal.ISignal; * * @author Christian Wulf * - * @since 1.10 + * @since 1.0 * * @param <T> - * the type of the input ports and the output port + * the type of both the input and output ports */ public class Merger<T> extends AbstractStage { @@ -40,6 +43,8 @@ public class Merger<T> extends AbstractStage { private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); + private final Map<Class<?>, Integer> signalMap = new HashMap<Class<?>, Integer>(); + @Override public void executeWithPorts() { final T token = this.strategy.getNextInput(this); @@ -54,18 +59,23 @@ public class Merger<T> extends AbstractStage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); - if (0 == finishedInputPorts) { + if (signalMap.containsKey(signal.getClass())) { + int value = signalMap.get(signal.getClass()); + value++; + if (value == this.getInputPorts().length) { + this.outputPort.sendSignal(signal); + signalMap.remove(signal.getClass()); + } else { + signalMap.put(signal.getClass(), value); + } + } else { signal.trigger(this); + signalMap.put(signal.getClass(), 1); } - this.finishedInputPorts++; - if (this.finishedInputPorts == this.getInputPorts().length) { - this.outputPort.sendSignal(signal); - this.finishedInputPorts = 0; - } } - public IMergerStrategy<T> getStrategy() { + public IMergerStrategy<T> getMergerStrategy() { return this.strategy; } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTest.java b/src/test/java/teetime/stage/basic/merger/MergerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a5d235ad159f3710c847139ccf3b936e1b685c77 --- /dev/null +++ b/src/test/java/teetime/stage/basic/merger/MergerTest.java @@ -0,0 +1,70 @@ +package teetime.stage.basic.merger; + +import org.junit.Assert; +import org.junit.Test; + +import teetime.framework.InputPort; +import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.TerminatingSignal; + +public class MergerTest { + + private Merger<Integer> merger; + private InputPort<Integer> firstPort; + private InputPort<Integer> secondPort; + private MergerTestingPipe testPipe; + + public void beforeSignalTesting() { + merger = new Merger<Integer>(); + + firstPort = merger.getNewInputPort(); + secondPort = merger.getNewInputPort(); + + testPipe = new MergerTestingPipe(); + merger.getOutputPort().setPipe(testPipe); + } + + @Test + public void testSameSignal() { + this.beforeSignalTesting(); + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + + merger.onSignal(new StartingSignal(), secondPort); + Assert.assertTrue(testPipe.startSent()); + } + + @Test + public void testDifferentSignals() { + this.beforeSignalTesting(); + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + + merger.onSignal(new TerminatingSignal(), secondPort); + Assert.assertFalse(testPipe.startSent()); + } + + @Test + public void testInterleavedSignals() { + this.beforeSignalTesting(); + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + Assert.assertFalse(testPipe.terminateSent()); + + merger.onSignal(new TerminatingSignal(), secondPort); + Assert.assertFalse(testPipe.startSent()); + Assert.assertFalse(testPipe.terminateSent()); + + merger.onSignal(new TerminatingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + Assert.assertTrue(testPipe.terminateSent()); + + merger.onSignal(new TerminatingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + Assert.assertTrue(testPipe.terminateSent()); + + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertTrue(testPipe.startSent()); + Assert.assertTrue(testPipe.terminateSent()); + } +} diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..e4c47289d5d6b9fc18b17b5093ba7e4c2ab9864a --- /dev/null +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -0,0 +1,81 @@ +package teetime.stage.basic.merger; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.pipe.IPipe; +import teetime.framework.signal.ISignal; +import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.TerminatingSignal; + +public class MergerTestingPipe implements IPipe { + + private boolean startSent = false; + private boolean terminateSent = false; + + public MergerTestingPipe() {} + + @Override + public void sendSignal(final ISignal signal) { + if (signal.getClass().equals(StartingSignal.class)) { + this.startSent = true; + } else if (signal.getClass().equals(TerminatingSignal.class)) { + this.terminateSent = true; + } + } + + public boolean startSent() { + return this.startSent; + } + + public boolean terminateSent() { + return this.terminateSent; + } + + @Override + public boolean add(final Object element) { + return false; + } + + @Override + public boolean isEmpty() { + // TODO Auto-generated method stub + return false; + } + + @Override + public int size() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Object removeLast() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Object readLast() { + // TODO Auto-generated method stub + return null; + } + + @Override + public InputPort<?> getTargetPort() { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + // TODO Auto-generated method stub + + } + + @Override + public void reportNewElement() { + // TODO Auto-generated method stub + + } + +}