From f31548429dab4f91bcdcff94d17ccca662e6eb8b Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 19 Jun 2015 11:00:44 +0200
Subject: [PATCH] added port actions

---
 .../java/teetime/framework/AbstractStage.java |  22 +++-
 .../teetime/framework/DynamicOutputPort.java  |  29 +++++
 .../java/teetime/framework/OutputPort.java    |   2 +-
 src/main/java/teetime/framework/Stage.java    |   2 +
 .../distributor/ControlledDistributor.java    | 121 ------------------
 .../basic/distributor/DynamicDistributor.java |  53 --------
 .../dynamic/ControlledDynamicDistributor.java |  10 ++
 .../CreatePortAction.java}                    |  27 ++--
 .../dynamic/DoNothingPortAction.java          |  10 ++
 .../dynamic/DynamicDistributor.java           |  60 +++++++++
 .../basic/distributor/dynamic/PortAction.java |   8 ++
 .../ControlledDistributorTest.java            |  52 ++++----
 12 files changed, 177 insertions(+), 219 deletions(-)
 create mode 100644 src/main/java/teetime/framework/DynamicOutputPort.java
 delete mode 100644 src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
 delete mode 100644 src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
 create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
 rename src/main/java/teetime/stage/basic/distributor/{DynamicPortActionContainer.java => dynamic/CreatePortAction.java} (56%)
 create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java
 create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
 create mode 100644 src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 7d3c023f..0aa030ed 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -266,7 +266,7 @@ public abstract class AbstractStage extends Stage {
 		return outputPort;
 	}
 
-	private <T> T[] addElementToArray(final T element, final T[] srcArray) {
+	private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) {
 		T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1);
 		newOutputPorts[srcArray.length] = element;
 		return newOutputPorts;
@@ -302,4 +302,24 @@ public abstract class AbstractStage extends Stage {
 		return TerminationStrategy.BY_SIGNAL;
 	}
 
+	protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
+		final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length);
+		outputPorts = addElementToArray(outputPort, outputPorts);
+		return outputPort;
+	}
+
+	@Override
+	protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
+		int index = dynamicOutputPort.getIndex();
+		List<OutputPort<?>> tempOutputPorts = Arrays.asList(outputPorts);
+		tempOutputPorts.remove(index);
+		for (int i = index; i < tempOutputPorts.size(); i++) {
+			OutputPort<?> outputPort = tempOutputPorts.get(i);
+			if (outputPort instanceof DynamicOutputPort) {
+				((DynamicOutputPort<?>) outputPort).setIndex(i);
+			}
+		}
+		outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
+	}
+
 }
diff --git a/src/main/java/teetime/framework/DynamicOutputPort.java b/src/main/java/teetime/framework/DynamicOutputPort.java
new file mode 100644
index 00000000..bc2a4a45
--- /dev/null
+++ b/src/main/java/teetime/framework/DynamicOutputPort.java
@@ -0,0 +1,29 @@
+package teetime.framework;
+
+/**
+ *
+ * @author Christian Wulf
+ *
+ * @param <T>
+ *            the type of elements to be sent
+ *
+ * @since 1.2
+ */
+public final class DynamicOutputPort<T> extends OutputPort<T> {
+
+	private int index;
+
+	DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
+		super(type, owningStage);
+		this.index = index;
+	}
+
+	public int getIndex() {
+		return index;
+	}
+
+	public void setIndex(final int index) {
+		this.index = index;
+	}
+
+}
diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java
index 103402d2..49a84042 100644
--- a/src/main/java/teetime/framework/OutputPort.java
+++ b/src/main/java/teetime/framework/OutputPort.java
@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal;
  *
  * @since 1.0
  */
-public final class OutputPort<T> extends AbstractPort<T> {
+public class OutputPort<T> extends AbstractPort<T> {
 
 	OutputPort(final Class<T> type, final Stage owningStage, final String portName) {
 		super(type, owningStage, portName);
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index a2c867ff..5f6b0d41 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -148,4 +148,6 @@ public abstract class Stage {
 		this.exceptionHandler = exceptionHandler;
 	}
 
+	protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort);
+
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
deleted file mode 100644
index fc66d3c8..00000000
--- a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package teetime.stage.basic.distributor;
-
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-
-import org.jctools.queues.QueueFactory;
-import org.jctools.queues.spec.ConcurrentQueueSpec;
-import org.jctools.queues.spec.Ordering;
-import org.jctools.queues.spec.Preference;
-
-import teetime.framework.AbstractStage;
-import teetime.framework.InputPort;
-import teetime.framework.OutputPort;
-import teetime.framework.Stage;
-import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
-import teetime.framework.exceptionHandling.StageException;
-import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
-import teetime.util.concurrent.queue.PCBlockingQueue;
-import teetime.util.concurrent.queue.putstrategy.PutStrategy;
-import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
-import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
-import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
-
-public class ControlledDistributor<T> extends AbstractStage {
-
-	// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
-	private final InputPort<T> inputPort = createInputPort();
-
-	private final OutputPort<T> outputPort = createOutputPort();
-
-	private final BlockingQueue<DynamicPortActionContainer<T>> actions;
-
-	public ControlledDistributor() {
-		final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
-		final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
-		final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
-		actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
-	}
-
-	@Override
-	public void onStarting() throws Exception {
-		getDistributor(outputPort); // throws an ClassCastException if it is not a distributor
-		super.onStarting();
-	}
-
-	@Override
-	// first, receive exact one element from the inputPort
-	// second, receive exact one element from the dynamicPortActionInputPort
-	// next, repeat in this order
-	protected void executeStage() {
-		T element = inputPort.receive();
-		if (null == element) {
-			returnNoElement();
-		}
-		passToDistributor(element);
-
-		try {
-			// DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
-			DynamicPortActionContainer<T> dynamicPortAction = actions.take();
-			// DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
-			if (null == dynamicPortAction) {
-				returnNoElement();
-			}
-			checkForOutputPortChange(dynamicPortAction);
-
-		} catch (InterruptedException e) {
-			final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
-			if (furtherExecution == FurtherExecution.TERMINATE) {
-				throw new StageException(e, this);
-			}
-		}
-	}
-
-	private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
-		System.out.println("" + dynamicPortAction.getDynamicPortAction());
-
-		switch (dynamicPortAction.getDynamicPortAction()) {
-		case CREATE:
-			Distributor<T> distributor = getDistributor(outputPort);
-			OutputPort<T> newOutputPort = distributor.getNewOutputPort();
-			dynamicPortAction.execute(newOutputPort);
-			break;
-		case REMOVE:
-			// TODO implement "remove port at runtime"
-			break;
-		default:
-			if (logger.isWarnEnabled()) {
-				logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
-			}
-			break;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private Distributor<T> getDistributor(final OutputPort<T> outputPort2) {
-		final Stage owningStage = outputPort.getPipe().getTargetPort().getOwningStage();
-		return (Distributor<T>) owningStage;
-	}
-
-	private void passToDistributor(final T element) {
-		System.out.println("Passing " + element);
-		outputPort.send(element);
-	}
-
-	public InputPort<T> getInputPort() {
-		return inputPort;
-	}
-
-	// public InputPort<DynamicPortActionContainer<T>> getDynamicPortActionInputPort() {
-	// return dynamicPortActionInputPort;
-	// }
-
-	public OutputPort<T> getOutputPort() {
-		return outputPort;
-	}
-
-	public Queue<DynamicPortActionContainer<T>> getActions() {
-		return actions;
-	}
-
-}
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
deleted file mode 100644
index 7aeb2cf2..00000000
--- a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package teetime.stage.basic.distributor;
-
-import java.util.Queue;
-
-import org.jctools.queues.QueueFactory;
-import org.jctools.queues.spec.ConcurrentQueueSpec;
-import org.jctools.queues.spec.Ordering;
-import org.jctools.queues.spec.Preference;
-
-import teetime.framework.OutputPort;
-import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
-import teetime.util.concurrent.queue.PCBlockingQueue;
-import teetime.util.concurrent.queue.putstrategy.PutStrategy;
-import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
-import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
-import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
-
-public class DynamicDistributor<T> extends Distributor<T> {
-
-	private final PCBlockingQueue<DynamicPortActionContainer<T>> actions;
-
-	public DynamicDistributor() {
-		final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
-		final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
-		final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
-		actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
-	}
-
-	@Override
-	protected void execute(final T element) {
-		checkForPendingPortActionRequest();
-
-		super.execute(element);
-	}
-
-	private void checkForPendingPortActionRequest() {
-		DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
-		switch (dynamicPortAction.getDynamicPortAction()) {
-		case CREATE:
-			OutputPort<T> newOutputPort = createOutputPort();
-			dynamicPortAction.execute(newOutputPort);
-			break;
-		case REMOVE:
-			// TODO implement "remove port at runtime"
-			break;
-		default:
-			if (logger.isWarnEnabled()) {
-				logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
-			}
-			break;
-		}
-	}
-}
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
new file mode 100644
index 00000000..f4332bef
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/ControlledDynamicDistributor.java
@@ -0,0 +1,10 @@
+package teetime.stage.basic.distributor.dynamic;
+
+public class ControlledDynamicDistributor<T> extends DynamicDistributor<T> {
+
+	@Override
+	protected PortAction<T> getPortAction() throws InterruptedException {
+		return portActions.take();
+	}
+
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
similarity index 56%
rename from src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
rename to src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
index d57e9abe..1d91c76c 100644
--- a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
@@ -1,4 +1,4 @@
-package teetime.stage.basic.distributor;
+package teetime.stage.basic.distributor.dynamic;
 
 import teetime.framework.DynamicActuator;
 import teetime.framework.InputPort;
@@ -7,37 +7,29 @@ import teetime.framework.pipe.SpScPipeFactory;
 import teetime.framework.signal.InitializingSignal;
 import teetime.framework.signal.StartingSignal;
 
-public class DynamicPortActionContainer<T> {
+public class CreatePortAction<T> implements PortAction<T> {
 
 	private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
+	private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator();
 
-	private final DynamicActuator dynamicActuator = new DynamicActuator();
-
-	public enum DynamicPortAction {
-		CREATE, REMOVE;
-	}
-
-	private final DynamicPortAction dynamicPortAction;
 	private final InputPort<T> inputPort;
 
-	public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
+	public CreatePortAction(final InputPort<T> inputPort) {
 		super();
-		this.dynamicPortAction = dynamicPortAction;
 		this.inputPort = inputPort;
 	}
 
-	public DynamicPortAction getDynamicPortAction() {
-		return dynamicPortAction;
-	}
-
 	public InputPort<T> getInputPort() {
 		return inputPort;
 	}
 
-	public void execute(final OutputPort<T> newOutputPort) {
+	@Override
+	public void execute(final DynamicDistributor<T> dynamicDistributor) {
+		OutputPort<? extends T> newOutputPort = dynamicDistributor.getNewOutputPort();
+
 		INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
 
-		Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage());
+		Runnable runnable = DYNAMIC_ACTUATOR.wrap(inputPort.getOwningStage());
 		Thread thread = new Thread(runnable);
 		thread.start();
 
@@ -46,5 +38,4 @@ public class DynamicPortActionContainer<T> {
 
 		// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
 	}
-
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java
new file mode 100644
index 00000000..bb4779a4
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DoNothingPortAction.java
@@ -0,0 +1,10 @@
+package teetime.stage.basic.distributor.dynamic;
+
+public class DoNothingPortAction<T> implements PortAction<T> {
+
+	@Override
+	public void execute(final DynamicDistributor<T> dynamicDistributor) {
+		// do nothing for testing purpose
+	}
+
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
new file mode 100644
index 00000000..d3f9c39e
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/DynamicDistributor.java
@@ -0,0 +1,60 @@
+package teetime.stage.basic.distributor.dynamic;
+
+import java.util.Queue;
+
+import org.jctools.queues.QueueFactory;
+import org.jctools.queues.spec.ConcurrentQueueSpec;
+import org.jctools.queues.spec.Ordering;
+import org.jctools.queues.spec.Preference;
+
+import teetime.framework.DynamicOutputPort;
+import teetime.stage.basic.distributor.Distributor;
+import teetime.util.concurrent.queue.PCBlockingQueue;
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
+import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
+
+public class DynamicDistributor<T> extends Distributor<T> {
+
+	protected final PCBlockingQueue<PortAction<T>> portActions;
+
+	public DynamicDistributor() {
+		final Queue<PortAction<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+		final PutStrategy<PortAction<T>> putStrategy = new YieldPutStrategy<PortAction<T>>();
+		final TakeStrategy<PortAction<T>> takeStrategy = new SCParkTakeStrategy<PortAction<T>>();
+		portActions = new PCBlockingQueue<PortAction<T>>(localQueue, putStrategy, takeStrategy);
+	}
+
+	@Override
+	protected void execute(final T element) {
+		try {
+			checkForPendingPortActionRequest();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+		super.execute(element);
+	}
+
+	private void checkForPendingPortActionRequest() throws InterruptedException {
+		PortAction<T> dynamicPortAction = getPortAction();
+		if (null != dynamicPortAction) {
+			dynamicPortAction.execute(this);
+		}
+	}
+
+	protected PortAction<T> getPortAction() throws InterruptedException {
+		return portActions.poll();
+	}
+
+	@Override
+	public void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
+		super.removeDynamicPort(dynamicOutputPort);
+	}
+
+	public boolean addPortActionRequest(final PortAction<T> newPortActionRequest) {
+		return portActions.offer(newPortActionRequest);
+	}
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java
new file mode 100644
index 00000000..885b26f1
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/PortAction.java
@@ -0,0 +1,8 @@
+package teetime.stage.basic.distributor.dynamic;
+
+
+public interface PortAction<T> {
+
+	public abstract void execute(final DynamicDistributor<T> dynamicDistributor);
+
+}
diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
index 49e9e88d..62746580 100644
--- a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
+++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
@@ -16,7 +16,11 @@ import teetime.framework.AnalysisConfiguration;
 import teetime.framework.Stage;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
-import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
+import teetime.stage.basic.distributor.dynamic.ControlledDynamicDistributor;
+import teetime.stage.basic.distributor.dynamic.CreatePortAction;
+import teetime.stage.basic.distributor.dynamic.DoNothingPortAction;
+import teetime.stage.basic.distributor.dynamic.DynamicDistributor;
+import teetime.stage.basic.distributor.dynamic.PortAction;
 
 public class ControlledDistributorTest {
 
@@ -29,12 +33,11 @@ public class ControlledDistributorTest {
 
 	@Test
 	public void shouldWorkWithoutActionTriggers() throws Exception {
-		DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
-				DynamicPortAction.REMOVE, null);
+		PortAction<Integer> createAction = new DoNothingPortAction<Integer>();
 
 		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
 		@SuppressWarnings("unchecked")
-		List<DynamicPortActionContainer<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
+		List<PortAction<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
 
 		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
 		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
@@ -49,7 +52,7 @@ public class ControlledDistributorTest {
 		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
 
 		@SuppressWarnings("unchecked")
-		DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
+		PortAction<Integer>[] inputActions = new PortAction[5];
 		for (int i = 0; i < inputActions.length; i++) {
 			CollectorSink<Integer> newStage = new CollectorSink<Integer>();
 
@@ -57,8 +60,7 @@ public class ControlledDistributorTest {
 			// Thread thread = new Thread(runnable);
 			// thread.start();
 
-			DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
-					DynamicPortAction.CREATE, newStage.getInputPort());
+			PortAction<Integer> createAction = new CreatePortAction<Integer>(newStage.getInputPort());
 			inputActions[i] = createAction;
 		}
 
@@ -67,25 +69,25 @@ public class ControlledDistributorTest {
 
 		analysis.executeBlocking();
 
-		for (DynamicPortActionContainer<Integer> ia : inputActions) {
-			Stage stage = ia.getInputPort().getOwningStage();
+		for (PortAction<Integer> ia : inputActions) {
+			Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
 			@SuppressWarnings("unchecked")
 			CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
 			System.out.println("collectorSink: " + collectorSink.getElements());
 		}
 
-		assertThat(config.getOutputElements(), contains(0, 1));
-		assertValuesForIndex(inputActions, Arrays.asList(2), 0);
-		assertValuesForIndex(inputActions, Arrays.asList(3), 1);
-		assertValuesForIndex(inputActions, Arrays.asList(4), 2);
-		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
+		assertThat(config.getOutputElements(), contains(0));
+		assertValuesForIndex(inputActions, Arrays.asList(1), 0);
+		assertValuesForIndex(inputActions, Arrays.asList(2), 1);
+		assertValuesForIndex(inputActions, Arrays.asList(3), 2);
+		assertValuesForIndex(inputActions, Arrays.asList(4), 3);
 		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
 	}
 
-	private void assertValuesForIndex(final DynamicPortActionContainer<Integer>[] inputActions,
+	private void assertValuesForIndex(final PortAction<Integer>[] inputActions,
 			final List<Integer> values, final int index) {
-		DynamicPortActionContainer<Integer> ia = inputActions[index];
-		Stage stage = ia.getInputPort().getOwningStage();
+		PortAction<Integer> ia = inputActions[index];
+		Stage stage = ((CreatePortAction<Integer>) ia).getInputPort().getOwningStage();
 		@SuppressWarnings("unchecked")
 		CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
 		assertThat(collectorSink.getElements(), is(values));
@@ -95,24 +97,24 @@ public class ControlledDistributorTest {
 
 		private final CollectorSink<T> collectorSink;
 
-		public ControlledDistributorTestConfig(final List<T> elements, final List<DynamicPortActionContainer<T>> actions) {
+		public ControlledDistributorTestConfig(final List<T> elements, final List<PortAction<T>> portActions) {
 			InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
-			// InitialElementProducer<DynamicPortActionContainer<T>> initialActionProducer = new InitialElementProducer<DynamicPortActionContainer<T>>(actions);
-			ControlledDistributor<T> controlledDistributor = new ControlledDistributor<T>();
-			Distributor<T> distributor = new Distributor<T>();
+			// InitialElementProducer<PortAction<T>> initialActionProducer = new InitialElementProducer<PortAction<T>>(actions);
+			DynamicDistributor<T> distributor = new ControlledDynamicDistributor<T>();
 			collectorSink = new CollectorSink<T>();
 
-			connectPorts(initialElementProducer.getOutputPort(), controlledDistributor.getInputPort());
+			connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort());
 			// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
-			connectPorts(controlledDistributor.getOutputPort(), distributor.getInputPort());
 			connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
 
 			addThreadableStage(initialElementProducer);
 			// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
-			addThreadableStage(controlledDistributor);
+			addThreadableStage(distributor);
 			addThreadableStage(collectorSink);
 
-			controlledDistributor.getActions().addAll(actions);
+			for (PortAction<T> a : portActions) {
+				distributor.addPortActionRequest(a);
+			}
 		}
 
 		public List<T> getOutputElements() {
-- 
GitLab