diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index c693b6488b8f478597fd093c39210198f8720488..60a3e2e89655954d6d8cd7aad6467cbd0d9b45e0 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -74,9 +74,10 @@ public final class Merger<T> extends AbstractStage { * @param inputPort * The port which the signal was sent to */ + @SuppressWarnings("unchecked") @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); + this.logger.info("Got signal: " + signal + " from input port: " + inputPort); if (signalMap.containsKey(signal.getClass())) { Set<InputPort<?>> set = signalMap.get(signal.getClass()); @@ -84,17 +85,19 @@ public final class Merger<T> extends AbstractStage { this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); } - if (set.size() == this.getInputPorts().length) { - signal.trigger(this); - this.outputPort.sendSignal(signal); - signalMap.remove(signal.getClass()); - } } else { Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); tempSet.add(inputPort); signalMap.put((Class<ISignal>) signal.getClass(), tempSet); } + if (signalMap.get(signal.getClass()).size() == this.getInputPorts().length) { + System.out.println("SENT"); + signal.trigger(this); + this.outputPort.sendSignal(signal); + signalMap.remove(signal.getClass()); + } + } public IMergerStrategy getMergerStrategy() { diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java index 3f174850cc4f500f6db3b5fc1703abb85873ca2b..6fec33e68ea191bc3ee82bdf22046e0aea285104 100644 --- a/src/test/java/teetime/stage/WordCountingTest.java +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -38,7 +38,7 @@ public class WordCountingTest { @Test public void test1() throws IOException { - int threads = 2; + int threads = 1; WordCountingConfiguration wcc = new WordCountingConfiguration(threads, testFile, testFile); Analysis analysis = new Analysis(wcc); analysis.start();