From cf92e069514dbc9e01c9b02ccf8ac0d6a6d3089e Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 26 Jun 2015 13:35:54 +0200
Subject: [PATCH] added working ControlledMergerTest

---
 .settings/edu.umd.cs.findbugs.core.prefs      |  2 +-
 .../teetime/framework/DynamicActuator.java    |  5 +-
 .../java/teetime/framework/Execution.java     |  3 +
 .../framework/RunnableConsumerStage.java      | 18 +++---
 .../framework/RunnableProducerStage.java      | 10 ++--
 .../merger/BusyWaitingRoundRobinStrategy.java | 58 +++++++++++++++++++
 .../teetime/stage/basic/merger/Merger.java    |  5 +-
 .../basic/merger/RoundRobinStrategy.java      |  3 +-
 .../dynamic/ControlledDynamicMerger.java      | 15 -----
 .../basic/merger/dynamic/DynamicMerger.java   |  7 ++-
 .../merger/dynamic/RemovePortAction.java      | 14 ++---
 .../util/framework/port/PortActionHelper.java |  2 +-
 src/site/markdown/wiki                        |  1 -
 .../merger/dynamic/ControlledMergerTest.java  | 25 ++++----
 src/test/resources/logback-test.xml           |  3 +
 15 files changed, 113 insertions(+), 58 deletions(-)
 create mode 100644 src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java
 delete mode 100644 src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java
 delete mode 160000 src/site/markdown/wiki

diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs
index e6f45f06..47121da3 100644
--- a/.settings/edu.umd.cs.findbugs.core.prefs
+++ b/.settings/edu.umd.cs.findbugs.core.prefs
@@ -1,5 +1,5 @@
 #FindBugs User Preferences
-#Mon Jun 22 16:47:12 CEST 2015
+#Thu Jun 25 14:06:30 CEST 2015
 detector_threshold=2
 effort=max
 excludefilter0=.fbExcludeFilterFile|true
diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java
index bd8e6dd3..86e41e0e 100644
--- a/src/main/java/teetime/framework/DynamicActuator.java
+++ b/src/main/java/teetime/framework/DynamicActuator.java
@@ -6,16 +6,17 @@ public class DynamicActuator {
 	 * @deprecated Use {@link #startWithinNewThread(Stage)} instead.
 	 */
 	@Deprecated
-	public Runnable wrap(final Stage stage) {
+	public AbstractRunnableStage wrap(final Stage stage) {
 		if (stage.getInputPorts().length > 0) {
 			return new RunnableConsumerStage(stage);
 		}
 		return new RunnableProducerStage(stage);
 	}
 
-	public void startWithinNewThread(final Stage stage) {
+	public Runnable startWithinNewThread(final Stage stage) {
 		Runnable runnable = wrap(stage);
 		Thread thread = new Thread(runnable);
 		thread.start();
+		return runnable;
 	}
 }
diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java
index fd15a5af..dd01458b 100644
--- a/src/main/java/teetime/framework/Execution.java
+++ b/src/main/java/teetime/framework/Execution.java
@@ -203,10 +203,12 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
 	 */
 	public void waitForTermination() {
 		try {
+			LOGGER.debug("Waiting for finiteProducerThreads");
 			for (Thread thread : this.finiteProducerThreads) {
 				thread.join();
 			}
 
+			LOGGER.debug("Waiting for consumerThreads");
 			for (Thread thread : this.consumerThreads) {
 				thread.join();
 			}
@@ -221,6 +223,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
 			}
 		}
 
+		LOGGER.debug("Interrupting infiniteProducerThreads...");
 		for (Thread thread : this.infiniteProducerThreads) {
 			thread.interrupt();
 		}
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index c1a78e20..0043ef7d 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -22,9 +22,6 @@ import teetime.framework.signal.TerminatingSignal;
 
 final class RunnableConsumerStage extends AbstractRunnableStage {
 
-	// cache the input ports here since getInputPorts() always returns a new copy
-	private final InputPort<?>[] inputPorts;
-
 	/**
 	 * Creates a new instance with the {@link YieldStrategy} as default idle strategy.
 	 *
@@ -37,17 +34,17 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 
 	public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
 		super(stage);
-		this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
 	}
 
 	@SuppressWarnings("PMD.GuardLogStatement")
 	@Override
 	protected void beforeStageExecution() throws InterruptedException {
-		logger.trace("Waiting for start signals... " + stage);
-		for (InputPort<?> inputPort : inputPorts) {
+		logger.trace("Waiting for init signals... " + stage);
+		for (InputPort<?> inputPort : stage.getInputPorts()) {
 			inputPort.waitForInitializingSignal();
 		}
-		for (InputPort<?> inputPort : inputPorts) {
+		logger.trace("Waiting for start signals... " + stage);
+		for (InputPort<?> inputPort : stage.getInputPorts()) {
 			inputPort.waitForStartSignal();
 		}
 		logger.trace("Starting... " + stage);
@@ -63,7 +60,10 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 	}
 
 	private void checkForTerminationSignal(final Stage stage) {
-		for (InputPort<?> inputPort : inputPorts) {
+		System.out.println("checkForTerminationSignal: " + stage);
+		// FIXME should getInputPorts() really be defined in Stage?
+		for (InputPort<?> inputPort : stage.getInputPorts()) {
+			System.out.println("\tclosed: " + inputPort.isClosed() + " (" + inputPort);
 			if (!inputPort.isClosed()) {
 				return;
 			}
@@ -75,7 +75,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 	@Override
 	protected void afterStageExecution() {
 		final ISignal signal = new TerminatingSignal();
-		for (InputPort<?> inputPort : inputPorts) {
+		for (InputPort<?> inputPort : stage.getInputPorts()) {
 			stage.onSignal(signal, inputPort);
 		}
 	}
diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java
index f8e9eb48..f25f4744 100644
--- a/src/main/java/teetime/framework/RunnableProducerStage.java
+++ b/src/main/java/teetime/framework/RunnableProducerStage.java
@@ -21,12 +21,12 @@ import teetime.framework.signal.InitializingSignal;
 import teetime.framework.signal.StartingSignal;
 import teetime.framework.signal.TerminatingSignal;
 
-final class RunnableProducerStage extends AbstractRunnableStage {
+public final class RunnableProducerStage extends AbstractRunnableStage {
 
 	private final Semaphore startSemaphore = new Semaphore(0);
 	private final Semaphore initSemaphore = new Semaphore(0);
 
-	public RunnableProducerStage(final Stage stage) {
+	RunnableProducerStage(final Stage stage) {
 		super(stage);
 	}
 
@@ -57,11 +57,13 @@ final class RunnableProducerStage extends AbstractRunnableStage {
 		startSemaphore.release();
 	}
 
-	public void waitForInitializingSignal() throws InterruptedException {
+	private void waitForInitializingSignal() throws InterruptedException {
+		logger.trace("waitForInitializingSignal");
 		initSemaphore.acquire();
 	}
 
-	public void waitForStartingSignal() throws InterruptedException {
+	private void waitForStartingSignal() throws InterruptedException {
+		logger.trace("waitForStartingSignal");
 		startSemaphore.acquire();
 	}
 }
diff --git a/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java
new file mode 100644
index 00000000..a440a1cf
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/merger/BusyWaitingRoundRobinStrategy.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package teetime.stage.basic.merger;
+
+import teetime.framework.InputPort;
+import teetime.framework.NotEnoughInputException;
+
+/**
+ * @author Christian Wulf
+ *
+ * @since 2.0
+ */
+public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy {
+
+	private int index = 0;
+
+	@Override
+	public <T> T getNextInput(final Merger<T> merger) {
+		final InputPort<T>[] inputPorts = merger.getInputPorts();
+		final InputPort<T> inputPort = getOpenInputPort(inputPorts);
+
+		final T token = inputPort.receive();
+		if (null != token) {
+			this.index = (this.index + 1) % inputPorts.length;
+		}
+
+		return token;
+	}
+
+	private <T> InputPort<T> getOpenInputPort(final InputPort<T>[] inputPorts) {
+		final int startedIndex = index;
+
+		InputPort<T> inputPort = inputPorts[this.index];
+		while (inputPort.isClosed()) {
+			this.index = (this.index + 1) % inputPorts.length;
+			if (index == startedIndex) {
+				throw new NotEnoughInputException();
+			}
+			inputPort = inputPorts[this.index];
+		}
+
+		return inputPort;
+	}
+
+}
diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java
index 20eb73bf..df36520d 100644
--- a/src/main/java/teetime/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/stage/basic/merger/Merger.java
@@ -102,9 +102,10 @@ public class Merger<T> extends AbstractStage {
 		return this.strategy;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public InputPort<?>[] getInputPorts() { // make public
-		return super.getInputPorts();
+	public InputPort<T>[] getInputPorts() { // make public
+		return (InputPort<T>[]) super.getInputPorts();
 	}
 
 	public InputPort<T> getNewInputPort() {
diff --git a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
index 38dc321a..89dee9a1 100644
--- a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
+++ b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
@@ -28,8 +28,7 @@ public final class RoundRobinStrategy implements IMergerStrategy {
 
 	@Override
 	public <T> T getNextInput(final Merger<T> merger) {
-		@SuppressWarnings("unchecked")
-		InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts();
+		final InputPort<T>[] inputPorts = merger.getInputPorts();
 		int size = inputPorts.length;
 		// check each port at most once to avoid a potentially infinite loop
 		while (size-- > 0) {
diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java
deleted file mode 100644
index 6a905b64..00000000
--- a/src/main/java/teetime/stage/basic/merger/dynamic/ControlledDynamicMerger.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package teetime.stage.basic.merger.dynamic;
-
-import teetime.util.framework.port.PortActionHelper;
-
-public class ControlledDynamicMerger<T> extends DynamicMerger<T> {
-
-	@Override
-	protected void checkForPendingPortActionRequest() {
-		try {
-			PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions);
-		} catch (InterruptedException e) {
-			throw new IllegalStateException(e);
-		}
-	}
-}
diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java
index c4de167d..ce7016c5 100644
--- a/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java
+++ b/src/main/java/teetime/stage/basic/merger/dynamic/DynamicMerger.java
@@ -3,6 +3,7 @@ package teetime.stage.basic.merger.dynamic;
 import java.util.concurrent.BlockingQueue;
 
 import teetime.framework.DynamicInputPort;
+import teetime.stage.basic.merger.IMergerStrategy;
 import teetime.stage.basic.merger.Merger;
 import teetime.util.framework.port.PortAction;
 import teetime.util.framework.port.PortActionHelper;
@@ -11,15 +12,15 @@ public class DynamicMerger<T> extends Merger<T> {
 
 	protected final BlockingQueue<PortAction<DynamicMerger<T>>> portActions;
 
-	public DynamicMerger() {
+	public DynamicMerger(final IMergerStrategy strategy) {
+		super(strategy);
 		portActions = PortActionHelper.createPortActionQueue();
 	}
 
 	@Override
 	public void executeStage() {
+		super.executeStage(); // must be first, to throw NotEnoughInputException before checking
 		checkForPendingPortActionRequest();
-
-		super.executeStage();
 	}
 
 	protected void checkForPendingPortActionRequest() {
diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java
index 6abcebde..01956251 100644
--- a/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java
+++ b/src/main/java/teetime/stage/basic/merger/dynamic/RemovePortAction.java
@@ -17,13 +17,13 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> {
 	public void execute(final DynamicMerger<T> dynamicMerger) {
 		InputPort<?> inputPortsToRemove;
 
-		if (dynamicMerger instanceof ControlledDynamicMerger) {
-			// for testing purposes only
-			InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts();
-			inputPortsToRemove = inputPorts[inputPorts.length - 1];
-		} else {
-			inputPortsToRemove = inputPort;
-		}
+		// if (dynamicMerger instanceof ControlledDynamicMerger) {
+		// // for testing purposes only
+		// InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts();
+		// inputPortsToRemove = inputPorts[inputPorts.length - 1];
+		// } else {
+		inputPortsToRemove = inputPort;
+		// }
 
 		dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove);
 	}
diff --git a/src/main/java/teetime/util/framework/port/PortActionHelper.java b/src/main/java/teetime/util/framework/port/PortActionHelper.java
index ac7c2e23..86c23563 100644
--- a/src/main/java/teetime/util/framework/port/PortActionHelper.java
+++ b/src/main/java/teetime/util/framework/port/PortActionHelper.java
@@ -31,7 +31,7 @@ public final class PortActionHelper {
 
 	public static <T extends Stage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) {
 		PortAction<T> dynamicPortAction = portActions.poll();
-		if (null != dynamicPortAction) { // check if getPortAction() uses polling
+		if (null != dynamicPortAction) {
 			dynamicPortAction.execute(stage);
 		}
 	}
diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki
deleted file mode 160000
index 0a5bd4dd..00000000
--- a/src/site/markdown/wiki
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 0a5bd4ddb82684ce1ae2ec84c67ff2117ebff143
diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java
index 2304e0fa..a6c84126 100644
--- a/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java
+++ b/src/test/java/teetime/stage/basic/merger/dynamic/ControlledMergerTest.java
@@ -3,6 +3,7 @@ package teetime.stage.basic.merger.dynamic;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.List;
@@ -12,11 +13,11 @@ import org.junit.Test;
 import teetime.framework.Configuration;
 import teetime.framework.DynamicActuator;
 import teetime.framework.Execution;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
-import teetime.framework.signal.InitializingSignal;
-import teetime.framework.signal.StartingSignal;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
+import teetime.stage.basic.merger.BusyWaitingRoundRobinStrategy;
 import teetime.util.framework.port.PortAction;
 
 public class ControlledMergerTest {
@@ -25,10 +26,10 @@ public class ControlledMergerTest {
 
 	@Test
 	public void shouldWorkWithoutActionTriggers() throws Exception {
-		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
+		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
 
 		@SuppressWarnings("unchecked")
-		PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[5];
+		PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6];
 		for (int i = 0; i < inputActions.length; i++) {
 			inputActions[i] = new DoNothingPortAction<Integer>();
 		}
@@ -39,7 +40,7 @@ public class ControlledMergerTest {
 
 		analysis.executeBlocking();
 
-		assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4));
+		assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4, 5));
 	}
 
 	@Test
@@ -47,7 +48,7 @@ public class ControlledMergerTest {
 		List<Integer> inputNumbers = Arrays.asList(0);
 
 		@SuppressWarnings("unchecked")
-		PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[5];
+		PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6];
 		for (int i = 0; i < inputActions.length; i++) {
 			inputActions[i] = createPortCreateAction(i + 1);
 		}
@@ -88,14 +89,15 @@ public class ControlledMergerTest {
 
 	private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) {
 		final InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(number);
-		DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer);
+		final Runnable runnableStage = DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer);
 
 		PortAction<DynamicMerger<Integer>> portAction = new CreatePortAction<Integer>(initialElementProducer.getOutputPort()) {
 			@Override
 			public void execute(final DynamicMerger<Integer> dynamicDistributor) {
 				super.execute(dynamicDistributor);
-				initialElementProducer.getOutputPort().sendSignal(new InitializingSignal());
-				initialElementProducer.getOutputPort().sendSignal(new StartingSignal());
+				final RunnableProducerStage runnableProducerStage = (RunnableProducerStage) runnableStage;
+				runnableProducerStage.triggerInitializingSignal();
+				runnableProducerStage.triggerStartingSignal();
 			}
 		};
 		return portAction;
@@ -107,7 +109,7 @@ public class ControlledMergerTest {
 
 		public ControlledMergerTestConfig(final List<T> elements, final List<PortAction<DynamicMerger<T>>> inputActions) {
 			InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
-			DynamicMerger<T> merger = new ControlledDynamicMerger<T>();
+			DynamicMerger<T> merger = new DynamicMerger<T>(new BusyWaitingRoundRobinStrategy());
 			collectorSink = new CollectorSink<T>();
 
 			connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
@@ -116,7 +118,8 @@ public class ControlledMergerTest {
 			addThreadableStage(merger);
 
 			for (PortAction<DynamicMerger<T>> a : inputActions) {
-				merger.addPortActionRequest(a);
+				boolean added = merger.addPortActionRequest(a);
+				assertTrue(added);
 			}
 		}
 
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index 45736549..e1bcc378 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -22,6 +22,9 @@
 	
 	<logger name="teetime" level="INFO" />
 	<logger name="teetime.framework" level="TRACE" />
+	<logger name="teetime.stage.InitialElementProducer" level="DEBUG" />
+	<logger name="teetime.stage.merger" level="TRACE" />
+<!-- 	<logger name="teetime.stage" level="TRACE" /> -->
 <!-- 	<logger name="teetime.framework.signal" level="TRACE" /> -->
 <!-- 	<logger name="teetime.stage" level="TRACE" /> -->
 	<logger name="util" level="INFO" />
-- 
GitLab