From b28869e140fe70a31d70d185f18026439a7f1c4d Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Mon, 22 Jun 2015 15:36:58 +0200
Subject: [PATCH] added working remove port action

---
 .../java/teetime/framework/AbstractStage.java | 19 +++++-
 .../framework/OutputPortRemovedListener.java  |  7 +++
 .../basic/distributor/CloneStrategy.java      |  6 ++
 .../distributor/CopyByReferenceStrategy.java  |  5 ++
 .../stage/basic/distributor/Distributor.java  |  8 ++-
 .../distributor/IDistributorStrategy.java     |  3 +-
 .../basic/distributor/RoundRobinStrategy.java |  8 +++
 .../distributor/RoundRobinStrategy2.java      | 13 +++-
 .../dynamic/ControlledDynamicDistributor.java |  6 ++
 .../distributor/dynamic/CreatePortAction.java |  2 +
 .../dynamic/DynamicDistributor.java           |  2 +-
 .../distributor/dynamic/RemovePortAction.java | 34 ++++++++++
 .../dynamic/ControlledDistributorTest.java    | 63 +++++++++++--------
 13 files changed, 145 insertions(+), 31 deletions(-)
 create mode 100644 src/main/java/teetime/framework/OutputPortRemovedListener.java
 create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 0aa030ed..27488c2d 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -15,6 +15,7 @@
  */
 package teetime.framework;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -35,6 +36,8 @@ public abstract class AbstractStage extends Stage {
 	private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
 	private StageState currentState = StageState.CREATED;
 
+	private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>();
+
 	@Override
 	public InputPort<?>[] getInputPorts() {
 		return inputPorts;
@@ -311,8 +314,8 @@ public abstract class AbstractStage extends Stage {
 	@Override
 	protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
 		int index = dynamicOutputPort.getIndex();
-		List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts);
-		tempOutputPorts.remove(index);
+		List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts));
+		OutputPort<?> removedOutputPort = tempOutputPorts.remove(index);
 		for (int i = index; i < tempOutputPorts.size(); i++) {
 			OutputPort<?> outputPort = tempOutputPorts.get(i);
 			if (outputPort instanceof DynamicOutputPort) {
@@ -320,6 +323,18 @@ public abstract class AbstractStage extends Stage {
 			}
 		}
 		outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
+
+		fireOutputPortRemoved(removedOutputPort);
+	}
+
+	private void fireOutputPortRemoved(final OutputPort<?> removedOutputPort) {
+		for (OutputPortRemovedListener listener : outputPortRemovedListeners) {
+			listener.onOutputPortRemoved(this, removedOutputPort);
+		}
+	}
+
+	protected void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) {
+		outputPortRemovedListeners.add(outputPortRemovedListener);
 	}
 
 }
diff --git a/src/main/java/teetime/framework/OutputPortRemovedListener.java b/src/main/java/teetime/framework/OutputPortRemovedListener.java
new file mode 100644
index 00000000..85a71b7a
--- /dev/null
+++ b/src/main/java/teetime/framework/OutputPortRemovedListener.java
@@ -0,0 +1,7 @@
+package teetime.framework;
+
+public interface OutputPortRemovedListener {
+
+	void onOutputPortRemoved(Stage stage, OutputPort<?> removedOutputPort);
+
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
index 7baf292a..c41d4177 100644
--- a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 
 import teetime.framework.OutputPort;
+import teetime.framework.Stage;
 
 /**
  * @author Nils Christian Ehmke
@@ -111,4 +112,9 @@ public final class CloneStrategy implements IDistributorStrategy {
 		return null;
 	}
 
+	@Override
+	public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
+		// do nothing
+	}
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
index 537aa22d..317a6eae 100644
--- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
@@ -16,6 +16,7 @@
 package teetime.stage.basic.distributor;
 
 import teetime.framework.OutputPort;
+import teetime.framework.Stage;
 
 /**
  * @author Nils Christian Ehmke
@@ -33,4 +34,8 @@ public final class CopyByReferenceStrategy implements IDistributorStrategy {
 		return true;
 	}
 
+	@Override
+	public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
+		// do nothing
+	}
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java
index 485fb876..f780e247 100644
--- a/src/main/java/teetime/stage/basic/distributor/Distributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java
@@ -36,6 +36,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
 
 	public Distributor(final IDistributorStrategy strategy) {
 		this.strategy = strategy;
+		addOutputPortRemovedListener(strategy);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -45,7 +46,7 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
 	}
 
 	public OutputPort<T> getNewOutputPort() {
-		return this.createOutputPort();
+		return this.createDynamicOutputPort();
 	}
 
 	public IDistributorStrategy getStrategy() {
@@ -56,4 +57,9 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
 		this.strategy = strategy;
 	}
 
+	@Override
+	public OutputPort<?>[] getOutputPorts() {
+		return super.getOutputPorts();
+	}
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
index 9a404d9d..4e42dc15 100644
--- a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
@@ -16,13 +16,14 @@
 package teetime.stage.basic.distributor;
 
 import teetime.framework.OutputPort;
+import teetime.framework.OutputPortRemovedListener;
 
 /**
  * @author Nils Christian Ehmke
  *
  * @since 1.0
  */
-public interface IDistributorStrategy {
+public interface IDistributorStrategy extends OutputPortRemovedListener {
 
 	public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
 
diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
index 3f7f22e7..de7df449 100644
--- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
@@ -16,6 +16,7 @@
 package teetime.stage.basic.distributor;
 
 import teetime.framework.OutputPort;
+import teetime.framework.Stage;
 
 /**
  * @author Nils Christian Ehmke
@@ -43,4 +44,11 @@ public final class RoundRobinStrategy implements IDistributorStrategy {
 		return outputPort;
 	}
 
+	@Override
+	public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
+		Distributor<?> distributor = (Distributor<?>) stage;
+		// correct the index if it is out-of-bounds
+		this.index = this.index % distributor.getOutputPorts().length;
+	}
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java
index 829e9f7e..fb11cefc 100644
--- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java
+++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy2.java
@@ -16,6 +16,7 @@
 package teetime.stage.basic.distributor;
 
 import teetime.framework.OutputPort;
+import teetime.framework.Stage;
 
 /**
  * @author Christian Wulf
@@ -33,8 +34,9 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
 		int numLoops = numOutputPorts;
 
 		boolean success;
+		OutputPort<T> outputPort;
 		do {
-			final OutputPort<T> outputPort = getNextPortInRoundRobinOrder(outputPorts);
+			outputPort = getNextPortInRoundRobinOrder(outputPorts);
 			success = outputPort.sendNonBlocking(element);
 			if (0 == numLoops) {
 				numWaits++;
@@ -44,6 +46,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
 			numLoops--;
 		} while (!success);
 
+		System.out.println("Sent " + element + " via " + outputPort);
+
 		return true;
 	}
 
@@ -69,4 +73,11 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
 		return numWaits;
 	}
 
+	@Override
+	public void onOutputPortRemoved(final Stage stage, final OutputPort<?> removedOutputPort) {
+		Distributor<?> distributor = (Distributor<?>) stage;
+		// correct the index if it is out-of-bounds
+		this.index = this.index % distributor.getOutputPorts().length;
+	}
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
index a61c3949..cd6ef44b 100644
--- a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
@@ -1,5 +1,6 @@
 package teetime.stage.basic.distributor.dynamic;
 
+
 class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
 
 	@Override
@@ -7,4 +8,9 @@ class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
 		return portActions.take();
 	}
 
+	// @Override
+	// protected OutputPort<?>[] getOutputPorts() { // repeated declaration for testing purposes
+	// return super.getOutputPorts();
+	// }
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
index 1d91c76c..7141ff95 100644
--- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
@@ -25,6 +25,7 @@ public class CreatePortAction<T> implements PortAction<T> {
 
 	@Override
 	public void execute(final DynamicDistributor<T> dynamicDistributor) {
+		System.out.println("Creating...");
 		OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
 
 		INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
@@ -37,5 +38,6 @@ public class CreatePortAction<T> implements PortAction<T> {
 		newOutputPort.sendSignal(new StartingSignal());
 
 		// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
+		System.out.println("Created.");
 	}
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
index d3f9c39e..0520d888 100644
--- a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
@@ -40,7 +40,7 @@ public class DynamicDistributor<T> extends Distributor<T> {
 
 	private void checkForPendingPortActionRequest() throws InterruptedException {
 		PortAction<T> dynamicPortAction = getPortAction();
-		if (null != dynamicPortAction) {
+		if (null != dynamicPortAction) { // check if getPortAction() uses polling
 			dynamicPortAction.execute(this);
 		}
 	}
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java
new file mode 100644
index 00000000..848445ff
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java
@@ -0,0 +1,34 @@
+package teetime.stage.basic.distributor.dynamic;
+
+import teetime.framework.DynamicOutputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.signal.TerminatingSignal;
+
+public class RemovePortAction<T> implements PortAction<T> {
+
+	private final DynamicOutputPort<T> outputPort;
+
+	public RemovePortAction(final DynamicOutputPort<T> outputPort) {
+		super();
+		this.outputPort = outputPort;
+	}
+
+	// public DynamicOutputPort<T> getOutputPort() {
+	// return outputPort;
+	// }
+
+	@Override
+	public void execute(final DynamicDistributor<T> dynamicDistributor) {
+		System.out.println("Removing...");
+		if (dynamicDistributor instanceof ControlledDynamicDistributor) {
+			OutputPort<?>[] outputPorts = ((ControlledDynamicDistributor<?>) dynamicDistributor).getOutputPorts();
+			OutputPort<?> outputPortToRemove = outputPorts[outputPorts.length - 1];
+			// outputPortToRemove = outputPort;
+
+			outputPortToRemove.sendSignal(new TerminatingSignal());
+
+			dynamicDistributor.removeDynamicPort((DynamicOutputPort<?>) outputPortToRemove);
+		}
+		System.out.println("Removed.");
+	}
+}
diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java
index b2db943f..74841da7 100644
--- a/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java
+++ b/src/test/java/teetime/stage/basic/distributor/dynamic/ControlledDistributorTest.java
@@ -14,13 +14,9 @@ import org.junit.Test;
 import teetime.framework.Analysis;
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.Stage;
+import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
-import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
-import teetime.stage.basic.distributor.dynamic.CreatePortAction;
-import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
-import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
-import teetime.stage.basic.distributor.dynamic.PortAction;
 
 public class ControlledDistributorTest {
 
@@ -40,7 +36,8 @@ public class ControlledDistributorTest {
 		List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
 
 		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
-		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
+		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
+				new TerminatingExceptionListenerFactory());
 
 		analysis.executeBlocking();
 
@@ -48,34 +45,22 @@ public class ControlledDistributorTest {
 	}
 
 	@Test
-	public void shouldWorkWithActionTriggers() throws Exception {
+	public void shouldWorkWithCreateActionTriggers() throws Exception {
 		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
 
 		@SuppressWarnings("unchecked")
 		PortAction<Integer>[] inputActions = new PortAction[5];
 		for (int i = 0; i < inputActions.length; i++) {
-			CollectorSink<Integer> newStage = new CollectorSink<Integer>();
-
-			// Runnable runnable = dynamicActuator.wrap(newStage);
-			// Thread thread = new Thread(runnable);
-			// thread.start();
-
-			PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
+			PortAction<Integer> createAction = createPortCreateAction();
 			inputActions[i] = createAction;
 		}
 
 		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
-		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
+		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
+				new TerminatingExceptionListenerFactory());
 
 		analysis.executeBlocking();
 
-		for (PortAction<Integer> ia : inputActions) {
-			Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
-			@SuppressWarnings("unchecked")
-			CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
-			System.out.println("collectorSink: " + collectorSink.getElements());
-		}
-
 		assertThat(config.getOutputElements(), contains(0));
 		assertValuesForIndex(inputActions, Arrays.asList(1), 0);
 		assertValuesForIndex(inputActions, Arrays.asList(2), 1);
@@ -84,6 +69,37 @@ public class ControlledDistributorTest {
 		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
 	}
 
+	@Test
+	public void shouldWorkWithRemoveActionTriggers() throws Exception {
+		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
+
+		@SuppressWarnings("unchecked")
+		PortAction<Integer>[] inputActions = new PortAction[6];
+		inputActions[0] = createPortCreateAction();
+		inputActions[1] = new RemovePortAction<Integer>(null);
+		inputActions[2] = createPortCreateAction();
+		inputActions[3] = createPortCreateAction();
+		inputActions[4] = new RemovePortAction<Integer>(null);
+		inputActions[5] = new RemovePortAction<Integer>(null);
+
+		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
+		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config,
+				new TerminatingExceptionListenerFactory());
+
+		analysis.executeBlocking();
+
+		assertThat(config.getOutputElements(), contains(0, 1, 2, 4, 5));
+		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 0);
+		assertValuesForIndex(inputActions, Arrays.asList(3), 2);
+		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
+	}
+
+	private PortAction<Integer> createPortCreateAction() {
+		CollectorSink<Integer> newStage = new CollectorSink<Integer>();
+		PortAction<Integer> portAction = new CreatePortAction<Integer>(newStage.getInputPort());
+		return portAction;
+	}
+
 	private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
 			final List<Integer> values, final int index) {
 		PortAction<Integer> ia = inputActions[index];
@@ -99,16 +115,13 @@ public class ControlledDistributorTest {
 
 		public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
 			InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
-			// InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
 			DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
 			collectorSink = new CollectorSink<T>();
 
 			connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
-			// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
 			connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
 
 			addThreadableStage(initialElementProducer);
-			// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
 			addThreadableStage(distributor);
 			addThreadableStage(collectorSink);
 
-- 
GitLab