From 918a8fbb4feb509ae61cfa53a5a5dbd6d0dc26eb Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sat, 9 May 2015 23:25:26 +0200
Subject: [PATCH] fixed signal handling in merger; refactored MergerSignalTest

---
 .../java/teetime/framework/AbstractStage.java |  8 +--
 .../java/teetime/framework/OutputPort.java    |  1 -
 .../teetime/framework/signal/ISignal.java     | 13 ++++
 .../framework/signal/InitializingSignal.java  | 23 +++----
 .../framework/signal/StartingSignal.java      | 19 ++----
 .../framework/signal/TerminatingSignal.java   | 19 ++----
 .../framework/signal/ValidatingSignal.java    |  9 ++-
 .../teetime/stage/basic/merger/Merger.java    | 62 ++++++++++-------
 .../stage/basic/merger/MergerSignalTest.java  | 66 +++++++++----------
 .../stage/basic/merger/MergerTestingPipe.java |  8 +--
 10 files changed, 118 insertions(+), 110 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 666863ef..6f3cf0e2 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -29,7 +29,7 @@ public abstract class AbstractStage extends Stage {
 
 	private static final IPipe DUMMY_PORT = new DummyPipe();
 
-	private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
+	private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
 
 	private InputPort<?>[] inputPorts = new InputPort<?>[0];
 	private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
@@ -73,16 +73,16 @@ public abstract class AbstractStage extends Stage {
 	 * @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise
 	 */
 	protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) {
-		boolean signalAlreadyReceived = this.triggeredSignals.contains(signal);
+		boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass());
 		if (signalAlreadyReceived) {
 			if (logger.isTraceEnabled()) {
-				logger.trace("Got signal: " + signal + " again from input port: " + inputPort);
+				logger.trace("Got signal again: " + signal + " from input port: " + inputPort);
 			}
 		} else {
 			if (logger.isTraceEnabled()) {
 				logger.trace("Got signal: " + signal + " from input port: " + inputPort);
 			}
-			this.triggeredSignals.add(signal);
+			this.triggeredSignalTypes.add(signal.getClass());
 		}
 		return signalAlreadyReceived;
 	}
diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java
index 4d23baaa..beea4479 100644
--- a/src/main/java/teetime/framework/OutputPort.java
+++ b/src/main/java/teetime/framework/OutputPort.java
@@ -69,5 +69,4 @@ public final class OutputPort<T> extends AbstractPort<T> {
 		}
 		pipe.sendSignal(signal);
 	}
-
 }
diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java
index 83326df1..eba8019a 100644
--- a/src/main/java/teetime/framework/signal/ISignal.java
+++ b/src/main/java/teetime/framework/signal/ISignal.java
@@ -15,9 +15,22 @@
  */
 package teetime.framework.signal;
 
+import java.util.Set;
+
+import teetime.framework.InputPort;
 import teetime.framework.Stage;
 
 public interface ISignal {
 
 	void trigger(Stage stage);
+
+	/**
+	 * Used by the merger only (so far)
+	 *
+	 * @param receivedInputPorts
+	 * @param allInputPorts
+	 * @param stageState
+	 * @return <code>true</code> iff the signal may be triggered, <code>false</code> otherwise.
+	 */
+	boolean mayBeTriggered(Set<InputPort<?>> receivedInputPorts, InputPort<?>[] allInputPorts);
 }
diff --git a/src/main/java/teetime/framework/signal/InitializingSignal.java b/src/main/java/teetime/framework/signal/InitializingSignal.java
index bb631edc..37fe60d2 100644
--- a/src/main/java/teetime/framework/signal/InitializingSignal.java
+++ b/src/main/java/teetime/framework/signal/InitializingSignal.java
@@ -15,33 +15,26 @@
  */
 package teetime.framework.signal;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
+import teetime.framework.InputPort;
 import teetime.framework.Stage;
 
-public final class InitializingSignal implements ISignal {
-
-	private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class);
-	private final List<Exception> catchedExceptions = new LinkedList<Exception>();
-
-	public InitializingSignal() {}
+public final class InitializingSignal extends AbstractSignal {
 
 	@Override
 	public void trigger(final Stage stage) {
 		try {
 			stage.onInitializing();
-		} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
+		} catch (final Exception e) { // NOCS NOPMD (Stages can throw any arbitrary Exception)
 			this.catchedExceptions.add(e);
-			LOGGER.error("Exception while sending the start signal", e);
+			LOGGER.error("Exception while sending the initializing signal", e);
 		}
 	}
 
-	public List<Exception> getCatchedExceptions() {
-		return this.catchedExceptions;
+	@Override
+	public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
+		return true;
 	}
 
 }
diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java
index aea07608..29f2a99a 100644
--- a/src/main/java/teetime/framework/signal/StartingSignal.java
+++ b/src/main/java/teetime/framework/signal/StartingSignal.java
@@ -15,20 +15,12 @@
  */
 package teetime.framework.signal;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
+import teetime.framework.InputPort;
 import teetime.framework.Stage;
 
-public final class StartingSignal implements ISignal {
-
-	private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class);
-	private final List<Exception> catchedExceptions = new LinkedList<Exception>();
-
-	public StartingSignal() {}
+public final class StartingSignal extends AbstractSignal {
 
 	@Override
 	public void trigger(final Stage stage) {
@@ -40,8 +32,9 @@ public final class StartingSignal implements ISignal {
 		}
 	}
 
-	public List<Exception> getCatchedExceptions() {
-		return this.catchedExceptions;
+	@Override
+	public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
+		return true;
 	}
 
 }
diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java
index 6ff84949..6be1b182 100644
--- a/src/main/java/teetime/framework/signal/TerminatingSignal.java
+++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java
@@ -15,20 +15,12 @@
  */
 package teetime.framework.signal;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
+import teetime.framework.InputPort;
 import teetime.framework.Stage;
 
-public final class TerminatingSignal implements ISignal {
-
-	private static final Logger LOGGER = LoggerFactory.getLogger(TerminatingSignal.class);
-	private final List<Exception> catchedExceptions = new LinkedList<Exception>();
-
-	public TerminatingSignal() {}
+public final class TerminatingSignal extends AbstractSignal {
 
 	@Override
 	public void trigger(final Stage stage) {
@@ -40,8 +32,9 @@ public final class TerminatingSignal implements ISignal {
 		}
 	}
 
-	public List<Exception> getCatchedExceptions() {
-		return this.catchedExceptions;
+	@Override
+	public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
+		return receivedInputPorts.size() == allInputPorts.length;
 	}
 
 }
diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java
index a15b37ee..a0a4ea09 100644
--- a/src/main/java/teetime/framework/signal/ValidatingSignal.java
+++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java
@@ -17,7 +17,9 @@ package teetime.framework.signal;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
+import teetime.framework.InputPort;
 import teetime.framework.Stage;
 import teetime.framework.validation.InvalidPortConnection;
 
@@ -25,8 +27,6 @@ public final class ValidatingSignal implements ISignal {
 
 	private final List<InvalidPortConnection> invalidPortConnections = new LinkedList<InvalidPortConnection>();
 
-	public ValidatingSignal() {}
-
 	@Override
 	public void trigger(final Stage stage) {
 		stage.onValidating(this.invalidPortConnections);
@@ -36,4 +36,9 @@ public final class ValidatingSignal implements ISignal {
 		return this.invalidPortConnections;
 	}
 
+	@Override
+	public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final InputPort<?>[] allInputPorts) {
+		return true;
+	}
+
 }
diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java
index e064fde9..4bf4ef57 100644
--- a/src/main/java/teetime/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/stage/basic/merger/Merger.java
@@ -24,9 +24,6 @@ import teetime.framework.AbstractStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.signal.ISignal;
-import teetime.framework.signal.InitializingSignal;
-import teetime.framework.signal.StartingSignal;
-import teetime.framework.signal.TerminatingSignal;
 
 /**
  *
@@ -85,34 +82,53 @@ public final class Merger<T> extends AbstractStage {
 
 		Class<? extends ISignal> signalClass = signal.getClass();
 
+		Set<InputPort<?>> inputPorts;
 		if (signalMap.containsKey(signalClass)) {
-			Set<InputPort<?>> set = signalMap.get(signalClass);
-			if (!set.add(inputPort)) {
-				this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
-			}
-			if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
-				signal.trigger(this);
-				sendSignalToOutputPorts(signal);
-				signalMap.remove(signalClass);
-			}
+			inputPorts = signalMap.get(signalClass);
 		} else {
-			Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
-			tempSet.add(inputPort);
-			signalMap.put((Class<ISignal>) signalClass, tempSet);
-			if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) {
-				signal.trigger(this);
-				sendSignalToOutputPorts(signal);
-			}
+			inputPorts = new HashSet<InputPort<?>>();
+			signalMap.put((Class<ISignal>) signalClass, inputPorts);
 		}
 
-	}
+		if (!inputPorts.add(inputPort)) {
+			this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
+		}
 
-	private void sendSignalToOutputPorts(final ISignal signal) {
-		for (OutputPort<?> outputPort : getOutputPorts()) {
-			outputPort.sendSignal(signal);
+		if (signal.mayBeTriggered(inputPorts, getInputPorts())) {
+			super.onSignal(signal, inputPort);
 		}
+
+		// if (signalMap.containsKey(signalClass)) {
+		// Set<InputPort<?>> set = signalMap.get(signalClass);
+		// if (!set.add(inputPort)) {
+		// this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
+		// }
+		// if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
+		// triggerAndPassOn(signal);
+		// // signalMap.remove(signalClass);
+		// }
+		// } else {
+		// Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
+		// signalMap.put((Class<ISignal>) signalClass, tempSet);
+		// tempSet.add(inputPort);
+		// if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) {
+		// triggerAndPassOn(signal);
+		// }
+		// }
+
 	}
 
+	// private void triggerAndPassOn(final ISignal signal) {
+	// signal.trigger(this);
+	// sendSignalToOutputPorts(signal);
+	// }
+
+	// private void sendSignalToOutputPorts(final ISignal signal) {
+	// for (OutputPort<?> outputPort : getOutputPorts()) {
+	// outputPort.sendSignal(signal);
+	// }
+	// }
+
 	public IMergerStrategy getMergerStrategy() {
 		return this.strategy;
 	}
diff --git a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java
index b1967bcd..35ca6644 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java
@@ -18,6 +18,7 @@ package teetime.stage.basic.merger;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import teetime.framework.InputPort;
@@ -29,83 +30,80 @@ public class MergerSignalTest {
 	private Merger<Integer> merger;
 	private InputPort<Integer> firstPort;
 	private InputPort<Integer> secondPort;
-	private MergerTestingPipe testPipe;
+	private MergerTestingPipe mergerOutputPipe;
 
+	@Before
 	public void beforeSignalTesting() {
 		merger = new Merger<Integer>();
 
 		firstPort = merger.getNewInputPort();
 		secondPort = merger.getNewInputPort();
 
-		testPipe = new MergerTestingPipe();
-		merger.getOutputPort().setPipe(testPipe);
+		mergerOutputPipe = new MergerTestingPipe();
+		merger.getOutputPort().setPipe(mergerOutputPipe);
 	}
 
 	@Test
 	public void testSameSignal() {
-		this.beforeSignalTesting();
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertTrue(testPipe.startSent());
-		testPipe.reset();
+		assertTrue(mergerOutputPipe.startSent());
+		mergerOutputPipe.reset();
 		merger.onSignal(new StartingSignal(), secondPort);
-		assertFalse(testPipe.startSent());
+		assertFalse(mergerOutputPipe.startSent());
 	}
 
 	@Test
 	public void testDifferentSignals() {
-		this.beforeSignalTesting();
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertTrue(testPipe.startSent());
-		testPipe.reset();
+		assertTrue(mergerOutputPipe.startSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new TerminatingSignal(), secondPort);
-		assertFalse(testPipe.startSent());
+		assertFalse(mergerOutputPipe.startSent());
 	}
 
 	@Test
 	public void testInterleavedSignals() {
-		this.beforeSignalTesting();
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertTrue(testPipe.startSent());
-		assertFalse(testPipe.terminateSent());
-		testPipe.reset();
+		assertTrue(mergerOutputPipe.startSent());
+		assertFalse(mergerOutputPipe.terminateSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new TerminatingSignal(), secondPort);
-		assertFalse(testPipe.startSent());
-		assertFalse(testPipe.terminateSent());
-		testPipe.reset();
+		assertFalse(mergerOutputPipe.startSent());
+		assertFalse(mergerOutputPipe.terminateSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new TerminatingSignal(), firstPort);
-		assertFalse(testPipe.startSent());
-		assertTrue(testPipe.terminateSent());
-		testPipe.reset();
+		assertFalse(mergerOutputPipe.startSent());
+		assertTrue(mergerOutputPipe.terminateSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new TerminatingSignal(), firstPort);
-		assertFalse(testPipe.startSent());
-		assertFalse(testPipe.terminateSent());
-		testPipe.reset();
+		assertFalse(mergerOutputPipe.startSent());
+		assertFalse(mergerOutputPipe.terminateSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new StartingSignal(), secondPort);
-		assertFalse(testPipe.startSent());
-		assertFalse(testPipe.terminateSent());
+		assertFalse(mergerOutputPipe.startSent());
+		assertFalse(mergerOutputPipe.terminateSent());
 	}
 
 	@Test
 	public void testMultipleSignals() {
-		this.beforeSignalTesting();
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertTrue(testPipe.startSent());
-		testPipe.reset();
+		assertTrue(mergerOutputPipe.startSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertFalse(testPipe.startSent());
-		testPipe.reset();
+		assertFalse(mergerOutputPipe.startSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new StartingSignal(), firstPort);
-		assertFalse(testPipe.startSent());
-		testPipe.reset();
+		assertFalse(mergerOutputPipe.startSent());
+		mergerOutputPipe.reset();
 
 		merger.onSignal(new StartingSignal(), secondPort);
-		assertFalse(testPipe.startSent());
+		assertFalse(mergerOutputPipe.startSent());
 	}
 }
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index afbc6561..42696bf7 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -22,18 +22,16 @@ import teetime.framework.signal.ISignal;
 import teetime.framework.signal.StartingSignal;
 import teetime.framework.signal.TerminatingSignal;
 
-public class MergerTestingPipe implements IPipe {
+class MergerTestingPipe implements IPipe {
 
 	private boolean startSent = false;
 	private boolean terminateSent = false;
 
-	public MergerTestingPipe() {}
-
 	@Override
 	public void sendSignal(final ISignal signal) {
-		if (signal.getClass().equals(StartingSignal.class)) {
+		if (signal instanceof StartingSignal) {
 			this.startSent = true;
-		} else if (signal.getClass().equals(TerminatingSignal.class)) {
+		} else if (signal instanceof TerminatingSignal) {
 			this.terminateSent = true;
 		}
 	}
-- 
GitLab