Skip to content
Snippets Groups Projects
Commit 7beb3670 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merger only sends one signal, depending on its type (Starting on first;

Terminating on last)
parent 3de6f9d3
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@ import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
/**
*
......@@ -88,19 +89,27 @@ public final class Merger<T> extends AbstractStage {
if (!set.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
signal.trigger(this);
sendSignalToOutputPorts(signal);
signalMap.remove(signalClass);
}
} else {
Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
tempSet.add(inputPort);
signalMap.put((Class<ISignal>) signalClass, tempSet);
if (signalClass == StartingSignal.class) {
signal.trigger(this);
sendSignalToOutputPorts(signal);
}
}
if (signalMap.get(signalClass).size() == this.getInputPorts().length || signalClass == StartingSignal.class) {
signal.trigger(this);
this.outputPort.sendSignal(signal);
signalMap.remove(signalClass);
}
}
private void sendSignalToOutputPorts(final ISignal signal) {
for (OutputPort<?> outputPort : getOutputPorts()) {
outputPort.sendSignal(signal);
}
}
public IMergerStrategy getMergerStrategy() {
......
......@@ -15,7 +15,9 @@
*/
package teetime.stage.basic.merger;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import teetime.framework.InputPort;
......@@ -43,59 +45,67 @@ public class MergerSignalTest {
public void testSameSignal() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
assertTrue(testPipe.startSent());
testPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
assertFalse(testPipe.startSent());
}
@Test
public void testDifferentSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
assertTrue(testPipe.startSent());
testPipe.reset();
merger.onSignal(new TerminatingSignal(), secondPort);
Assert.assertFalse(testPipe.startSent());
assertFalse(testPipe.startSent());
}
@Test
public void testInterleavedSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
assertTrue(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
merger.onSignal(new TerminatingSignal(), secondPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertFalse(testPipe.terminateSent());
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
merger.onSignal(new TerminatingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
assertFalse(testPipe.startSent());
assertTrue(testPipe.terminateSent());
testPipe.reset();
merger.onSignal(new TerminatingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
testPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
Assert.assertTrue(testPipe.terminateSent());
assertFalse(testPipe.startSent());
assertFalse(testPipe.terminateSent());
}
@Test
public void testMultipleSignals() {
this.beforeSignalTesting();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
assertTrue(testPipe.startSent());
testPipe.reset();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
assertFalse(testPipe.startSent());
testPipe.reset();
merger.onSignal(new StartingSignal(), firstPort);
Assert.assertFalse(testPipe.startSent());
assertFalse(testPipe.startSent());
testPipe.reset();
merger.onSignal(new StartingSignal(), secondPort);
Assert.assertTrue(testPipe.startSent());
assertFalse(testPipe.startSent());
}
}
......@@ -46,6 +46,11 @@ public class MergerTestingPipe implements IPipe {
return this.terminateSent;
}
public void reset() {
this.startSent = false;
this.terminateSent = false;
}
@Override
public boolean add(final Object element) {
return false;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment