From 41f0a1a94deae5c91a4ba0fe2466d18abf599fd6 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de>
Date: Fri, 21 Aug 2015 13:23:44 +0200
Subject: [PATCH] fixes #220 signals get checked, if they should be passed on
 (merger logic moved to AbstractStage)

---
 .../java/teetime/framework/AbstractStage.java | 18 ++++++++-
 .../framework/signal/TerminatingSignal.java   |  2 +-
 .../teetime/stage/basic/merger/Merger.java    | 37 -------------------
 3 files changed, 18 insertions(+), 39 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 6a6769ca..42dd4f8a 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -15,8 +15,10 @@
  */
 package teetime.framework;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import teetime.framework.pipe.IPipe;
@@ -26,6 +28,7 @@ import teetime.util.framework.port.PortList;
 import teetime.util.framework.port.PortRemovedListener;
 
 public abstract class AbstractStage extends Stage {
+	private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
 
 	private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
 
@@ -54,7 +57,20 @@ public abstract class AbstractStage extends Stage {
 	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
 	@Override
 	public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
-		if (!this.signalAlreadyReceived(signal, inputPort)) {
+		Class<? extends ISignal> signalClass = signal.getClass();
+
+		Set<InputPort<?>> signalReceivedInputPorts;
+		signalReceivedInputPorts = signalMap.get(signalClass);
+		if (signalReceivedInputPorts == null) {
+			signalReceivedInputPorts = new HashSet<InputPort<?>>();
+			signalMap.put(signalClass, signalReceivedInputPorts);
+		}
+
+		if (!signalReceivedInputPorts.add(inputPort)) {
+			this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
+			return;
+		}
+		if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
 			try {
 				signal.trigger(this);
 			} catch (Exception e) {
diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java
index f17b532a..211c57bb 100644
--- a/src/main/java/teetime/framework/signal/TerminatingSignal.java
+++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java
@@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal {
 
 	@Override
 	public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
-		return receivedInputPorts.size() == allInputPorts.size();
+		return receivedInputPorts.size() >= allInputPorts.size();
 	}
 
 }
diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java
index 89bc7622..250f35d8 100644
--- a/src/main/java/teetime/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/stage/basic/merger/Merger.java
@@ -16,7 +16,6 @@
 package teetime.stage.basic.merger;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,42 +65,6 @@ public class Merger<T> extends AbstractStage {
 		outputPort.send(token);
 	}
 
-	/**
-	 * This method is executed, if a signal is sent to a instance of this class.
-	 * Multiple signals of one certain type are ignored, if they are sent to same port.
-	 * Hence a signal is only passed on, when it arrived on all input ports, regardless how often.
-	 *
-	 * @param signal
-	 *            Signal which is sent
-	 *
-	 * @param inputPort
-	 *            The port which the signal was sent to
-	 */
-	@Override
-	public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
-		if (logger.isTraceEnabled()) {
-			this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
-		}
-
-		Class<? extends ISignal> signalClass = signal.getClass();
-
-		Set<InputPort<?>> signalReceivedInputPorts;
-		if (signalMap.containsKey(signalClass)) {
-			signalReceivedInputPorts = signalMap.get(signalClass);
-		} else {
-			signalReceivedInputPorts = new HashSet<InputPort<?>>();
-			signalMap.put(signalClass, signalReceivedInputPorts);
-		}
-
-		if (!signalReceivedInputPorts.add(inputPort)) {
-			this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
-		}
-
-		if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
-			super.onSignal(signal, inputPort);
-		}
-	}
-
 	public IMergerStrategy getMergerStrategy() {
 		return this.strategy;
 	}
-- 
GitLab