From 11e116dff6c9b694a506b8e9bded41bdd7fc27fe Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Thu, 18 Jun 2015 17:45:39 +0200
Subject: [PATCH] added ControlledDistributor + test

---
 .../distributor/ControlledDistributor.java    | 128 ++++++++++++++++++
 .../basic/distributor/DynamicDistributor.java |  31 +----
 .../DynamicPortActionContainer.java           |  28 ++++
 .../ControlledDistributorTest.java            | 116 ++++++++++++++++
 4 files changed, 276 insertions(+), 27 deletions(-)
 create mode 100644 src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
 create mode 100644 src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
 create mode 100644 src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java

diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
new file mode 100644
index 00000000..84ad9b21
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
@@ -0,0 +1,128 @@
+package teetime.stage.basic.distributor;
+
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.jctools.queues.QueueFactory;
+import org.jctools.queues.spec.ConcurrentQueueSpec;
+import org.jctools.queues.spec.Ordering;
+import org.jctools.queues.spec.Preference;
+
+import teetime.framework.AbstractStage;
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.Stage;
+import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
+import teetime.framework.exceptionHandling.StageException;
+import teetime.framework.pipe.SingleElementPipeFactory;
+import teetime.framework.pipe.SpScPipeFactory;
+import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
+import teetime.util.concurrent.queue.PCBlockingQueue;
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
+import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
+
+public class ControlledDistributor<T> extends AbstractStage {
+
+	private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
+	private static final SingleElementPipeFactory intraPipeFactory = new SingleElementPipeFactory();
+
+	// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
+	private final InputPort<T> inputPort = createInputPort();
+
+	private final OutputPort<T> outputPort = createOutputPort();
+
+	private final BlockingQueue<DynamicPortActionContainer<T>> actions;
+
+	public ControlledDistributor() {
+		final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+		final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
+		final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
+		actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
+	}
+
+	@Override
+	public void onStarting() throws Exception {
+		getDistributor(outputPort); // throws an ClassCastException if it is not a distributor
+		super.onStarting();
+	}
+
+	@Override
+	// first, receive exact one element from the inputPort
+	// second, receive exact one element from the dynamicPortActionInputPort
+	// next, repeat in this order
+	protected void executeStage() {
+		T element = inputPort.receive();
+		if (null == element) {
+			returnNoElement();
+		}
+		passToDistributor(element);
+
+		try {
+			// DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
+			DynamicPortActionContainer<T> dynamicPortAction = actions.take();
+			// DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
+			if (null == dynamicPortAction) {
+				returnNoElement();
+			}
+			checkForOutputPortChange(dynamicPortAction);
+
+		} catch (InterruptedException e) {
+			final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
+			if (furtherExecution == FurtherExecution.TERMINATE) {
+				throw new StageException(e, this);
+			}
+		}
+	}
+
+	private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
+		System.out.println("" + dynamicPortAction.getDynamicPortAction());
+
+		switch (dynamicPortAction.getDynamicPortAction()) {
+		case CREATE:
+			Distributor<T> distributor = getDistributor(outputPort);
+			OutputPort<T> newOutputPort = distributor.getNewOutputPort();
+			InputPort<T> newInputPort = dynamicPortAction.getInputPort();
+			// spScPipeFactory.create(newOutputPort, newInputPort);
+			intraPipeFactory.create(newOutputPort, newInputPort); // FIXME should be inter, but requires sending init and start signal
+			break;
+		case REMOVE:
+			// TODO implement "remove port at runtime"
+			break;
+		default:
+			if (logger.isWarnEnabled()) {
+				logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
+			}
+			break;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private Distributor<T> getDistributor(final OutputPort<T> outputPort2) {
+		final Stage owningStage = outputPort.getPipe().getTargetPort().getOwningStage();
+		return (Distributor<T>) owningStage;
+	}
+
+	private void passToDistributor(final T element) {
+		System.out.println("Passing " + element);
+		outputPort.send(element);
+	}
+
+	public InputPort<T> getInputPort() {
+		return inputPort;
+	}
+
+	// public InputPort<DynamicPortActionContainer<T>> getDynamicPortActionInputPort() {
+	// return dynamicPortActionInputPort;
+	// }
+
+	public OutputPort<T> getOutputPort() {
+		return outputPort;
+	}
+
+	public Queue<DynamicPortActionContainer<T>> getActions() {
+		return actions;
+	}
+
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
index c7a48c65..a7e4c9f1 100644
--- a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
@@ -3,35 +3,12 @@ package teetime.stage.basic.distributor;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.pipe.SpScPipeFactory;
+import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
 
 public class DynamicDistributor<T> extends Distributor<T> {
 
 	private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
 
-	public enum DynamicPortAction {
-		CREATE, REMOVE;
-	}
-
-	public static class DynamicPortActionContainer<T> {
-		private final DynamicPortAction dynamicPortAction;
-		private final InputPort<T> inputPort;
-
-		public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
-			super();
-			this.dynamicPortAction = dynamicPortAction;
-			this.inputPort = inputPort;
-		}
-
-		public DynamicPortAction getDynamicPortAction() {
-			return dynamicPortAction;
-		}
-
-		public InputPort<T> getInputPort() {
-			return inputPort;
-		}
-
-	}
-
 	@SuppressWarnings("rawtypes")
 	private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class);
 
@@ -39,10 +16,10 @@ public class DynamicDistributor<T> extends Distributor<T> {
 	@Override
 	protected void execute(final T element) {
 		DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
-		switch (dynamicPortAction.dynamicPortAction) {
+		switch (dynamicPortAction.getDynamicPortAction()) {
 		case CREATE:
 			OutputPort<T> newOutputPort = createOutputPort();
-			InputPort<T> newInputPort = dynamicPortAction.inputPort;
+			InputPort<T> newInputPort = dynamicPortAction.getInputPort();
 			spScPipeFactory.create(newOutputPort, newInputPort);
 			break;
 		case REMOVE:
@@ -50,7 +27,7 @@ public class DynamicDistributor<T> extends Distributor<T> {
 			break;
 		default:
 			if (logger.isWarnEnabled()) {
-				logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.dynamicPortAction);
+				logger.warn("Unhandled switch case of " + DynamicPortAction.class.getName() + ": " + dynamicPortAction.getDynamicPortAction());
 			}
 			break;
 		}
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
new file mode 100644
index 00000000..57847fd3
--- /dev/null
+++ b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
@@ -0,0 +1,28 @@
+package teetime.stage.basic.distributor;
+
+import teetime.framework.InputPort;
+
+public class DynamicPortActionContainer<T> {
+
+	public enum DynamicPortAction {
+		CREATE, REMOVE;
+	}
+
+	private final DynamicPortAction dynamicPortAction;
+	private final InputPort<T> inputPort;
+
+	public DynamicPortActionContainer(final DynamicPortAction dynamicPortAction, final InputPort<T> inputPort) {
+		super();
+		this.dynamicPortAction = dynamicPortAction;
+		this.inputPort = inputPort;
+	}
+
+	public DynamicPortAction getDynamicPortAction() {
+		return dynamicPortAction;
+	}
+
+	public InputPort<T> getInputPort() {
+		return inputPort;
+	}
+
+}
diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
new file mode 100644
index 00000000..42b13abb
--- /dev/null
+++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
@@ -0,0 +1,116 @@
+package teetime.stage.basic.distributor;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import teetime.framework.Analysis;
+import teetime.framework.AnalysisConfiguration;
+import teetime.framework.Stage;
+import teetime.stage.CollectorSink;
+import teetime.stage.InitialElementProducer;
+import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
+
+public class ControlledDistributorTest {
+
+	// private ControlledDistributor<Integer> controlledDistributor;
+
+	@Before
+	public void setUp() throws Exception {
+		// controlledDistributor = new ControlledDistributor<Integer>();
+	}
+
+	@Test
+	public void shouldWorkWithoutActionTriggers() throws Exception {
+		DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
+				DynamicPortAction.REMOVE, null);
+
+		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
+		@SuppressWarnings("unchecked")
+		List<DynamicPortActionContainer<Integer>> inputActions = Arrays.asList(createAction, createAction, createAction, createAction, createAction);
+
+		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, inputActions);
+		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
+
+		analysis.executeBlocking();
+
+		assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4));
+	}
+
+	@Test
+	public void shouldWorkWithActionTriggers() throws Exception {
+		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
+
+		@SuppressWarnings("unchecked")
+		DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
+		for (int i = 0; i < inputActions.length; i++) {
+			DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
+					DynamicPortAction.CREATE, new CollectorSink<Integer>().getInputPort());
+			inputActions[i] = createAction;
+		}
+
+		ControlledDistributorTestConfig<Integer> config = new ControlledDistributorTestConfig<Integer>(inputNumbers, Arrays.asList(inputActions));
+		Analysis<ControlledDistributorTestConfig<Integer>> analysis = new Analysis<ControlledDistributorTestConfig<Integer>>(config);
+
+		analysis.executeBlocking();
+
+		for (DynamicPortActionContainer<Integer> ia : inputActions) {
+			Stage stage = ia.getInputPort().getOwningStage();
+			@SuppressWarnings("unchecked")
+			CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
+			System.out.println("collectorSink: " + collectorSink.getElements());
+		}
+
+		assertThat(config.getOutputElements(), contains(0, 1));
+		assertValuesForIndex(inputActions, Arrays.asList(2), 0);
+		assertValuesForIndex(inputActions, Arrays.asList(3), 1);
+		assertValuesForIndex(inputActions, Arrays.asList(4), 2);
+		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 3);
+		assertValuesForIndex(inputActions, Collections.<Integer> emptyList(), 4);
+	}
+
+	private void assertValuesForIndex(final DynamicPortActionContainer<Integer>[] inputActions,
+			final List<Integer> values, final int index) {
+		DynamicPortActionContainer<Integer> ia = inputActions[index];
+		Stage stage = ia.getInputPort().getOwningStage();
+		@SuppressWarnings("unchecked")
+		CollectorSink<Integer> collectorSink = (CollectorSink<Integer>) stage;
+		assertThat(collectorSink.getElements(), is(values));
+	}
+
+	private static class ControlledDistributorTestConfig<T> extends AnalysisConfiguration {
+
+		private final CollectorSink<T> collectorSink;
+
+		public ControlledDistributorTestConfig(final List<T> elements, final List<DynamicPortActionContainer<T>> actions) {
+			InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
+			// InitialElementProducer<DynamicPortActionContainer<T>> initialActionProducer = new InitialElementProducer<DynamicPortActionContainer<T>>(actions);
+			ControlledDistributor<T> controlledDistributor = new ControlledDistributor<T>();
+			Distributor<T> distributor = new Distributor<T>();
+			collectorSink = new CollectorSink<T>();
+
+			connectPorts(initialElementProducer.getOutputPort(), controlledDistributor.getInputPort());
+			// connectPorts(initialActionProducer.getOutputPort(), controlledDistributor.getDynamicPortActionInputPort());
+			connectPorts(controlledDistributor.getOutputPort(), distributor.getInputPort());
+			connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort());
+
+			addThreadableStage(initialElementProducer);
+			// addThreadableStage(initialActionProducer); // simulates the AdaptationThread
+			addThreadableStage(controlledDistributor);
+			addThreadableStage(collectorSink);
+
+			controlledDistributor.getActions().addAll(actions);
+		}
+
+		public List<T> getOutputElements() {
+			return collectorSink.getElements();
+		}
+	}
+}
-- 
GitLab