From 19c7548942b4144cb3f24c1fc4a743009c6a5be7 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 19 Jun 2015 07:52:37 +0200
Subject: [PATCH] completed CREATE action for dynamic distributor

---
 .../framework/AbstractInterThreadPipe.java    | 12 ++++++++----
 .../teetime/framework/DynamicActuator.java    |  8 ++++++++
 .../distributor/ControlledDistributor.java    | 19 +++++++++++++++----
 .../ControlledDistributorTest.java            | 10 +++++++++-
 src/test/resources/logback-test.xml           |  2 +-
 5 files changed, 41 insertions(+), 10 deletions(-)
 create mode 100644 src/main/java/teetime/framework/DynamicActuator.java

diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index acb1752e..9ed1d4c9 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference;
 
 import teetime.framework.signal.ISignal;
 import teetime.framework.signal.InitializingSignal;
+import teetime.framework.signal.StartingSignal;
 import teetime.util.concurrent.queue.PCBlockingQueue;
 import teetime.util.concurrent.queue.putstrategy.PutStrategy;
 import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
@@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 	}
 
 	@Override
-	public final void waitForStartSignal() throws InterruptedException {
+	public final void waitForInitializingSignal() throws InterruptedException {
 		final ISignal signal = signalQueue.take();
+		if (!(signal instanceof InitializingSignal)) {
+			throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName());
+		}
 		cachedTargetStage.onSignal(signal, getTargetPort());
 	}
 
 	@Override
-	public final void waitForInitializingSignal() throws InterruptedException {
+	public final void waitForStartSignal() throws InterruptedException {
 		final ISignal signal = signalQueue.take();
-		if (!(signal instanceof InitializingSignal)) {
-			throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal");
+		if (!(signal instanceof StartingSignal)) {
+			throw new IllegalStateException("Expected StartingSignal, but was " + signal.getClass().getSimpleName());
 		}
 		cachedTargetStage.onSignal(signal, getTargetPort());
 	}
diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java
new file mode 100644
index 00000000..c53abdd4
--- /dev/null
+++ b/src/main/java/teetime/framework/DynamicActuator.java
@@ -0,0 +1,8 @@
+package teetime.framework;
+
+public class DynamicActuator {
+
+	public Runnable wrap(final Stage stage) {
+		return new RunnableConsumerStage(stage);
+	}
+}
diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
index 84ad9b21..cb9237f0 100644
--- a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
@@ -9,13 +9,15 @@ import org.jctools.queues.spec.Ordering;
 import org.jctools.queues.spec.Preference;
 
 import teetime.framework.AbstractStage;
+import teetime.framework.DynamicActuator;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
 import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
 import teetime.framework.exceptionHandling.StageException;
-import teetime.framework.pipe.SingleElementPipeFactory;
 import teetime.framework.pipe.SpScPipeFactory;
+import teetime.framework.signal.InitializingSignal;
+import teetime.framework.signal.StartingSignal;
 import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
 import teetime.util.concurrent.queue.PCBlockingQueue;
 import teetime.util.concurrent.queue.putstrategy.PutStrategy;
@@ -26,7 +28,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
 public class ControlledDistributor<T> extends AbstractStage {
 
 	private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
-	private static final SingleElementPipeFactory intraPipeFactory = new SingleElementPipeFactory();
 
 	// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
 	private final InputPort<T> inputPort = createInputPort();
@@ -76,6 +77,8 @@ public class ControlledDistributor<T> extends AbstractStage {
 		}
 	}
 
+	private final DynamicActuator dynamicActuator = new DynamicActuator();
+
 	private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
 		System.out.println("" + dynamicPortAction.getDynamicPortAction());
 
@@ -84,8 +87,16 @@ public class ControlledDistributor<T> extends AbstractStage {
 			Distributor<T> distributor = getDistributor(outputPort);
 			OutputPort<T> newOutputPort = distributor.getNewOutputPort();
 			InputPort<T> newInputPort = dynamicPortAction.getInputPort();
-			// spScPipeFactory.create(newOutputPort, newInputPort);
-			intraPipeFactory.create(newOutputPort, newInputPort); // FIXME should be inter, but requires sending init and start signal
+			spScPipeFactory.create(newOutputPort, newInputPort);
+
+			Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage());
+			Thread thread = new Thread(runnable);
+			thread.start();
+
+			newOutputPort.sendSignal(new InitializingSignal());
+			newOutputPort.sendSignal(new StartingSignal());
+
+			// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
 			break;
 		case REMOVE:
 			// TODO implement "remove port at runtime"
diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
index 42b13abb..69a537b5 100644
--- a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
+++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
@@ -13,6 +13,7 @@ import org.junit.Test;
 
 import teetime.framework.Analysis;
 import teetime.framework.AnalysisConfiguration;
+import teetime.framework.DynamicActuator;
 import teetime.framework.Stage;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
@@ -21,6 +22,7 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct
 public class ControlledDistributorTest {
 
 	// private ControlledDistributor<Integer> controlledDistributor;
+	private final DynamicActuator dynamicActuator = new DynamicActuator();
 
 	@Before
 	public void setUp() throws Exception {
@@ -51,8 +53,14 @@ public class ControlledDistributorTest {
 		@SuppressWarnings("unchecked")
 		DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
 		for (int i = 0; i < inputActions.length; i++) {
+			CollectorSink<Integer> newStage = new CollectorSink<Integer>();
+
+			// Runnable runnable = dynamicActuator.wrap(newStage);
+			// Thread thread = new Thread(runnable);
+			// thread.start();
+
 			DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
-					DynamicPortAction.CREATE, new CollectorSink<Integer>().getInputPort());
+					DynamicPortAction.CREATE, newStage.getInputPort());
 			inputActions[i] = createAction;
 		}
 
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index 19f06bd8..45736549 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -21,7 +21,7 @@
 	</appender>
 	
 	<logger name="teetime" level="INFO" />
-<!-- 	<logger name="teetime.framework" level="TRACE" /> -->
+	<logger name="teetime.framework" level="TRACE" />
 <!-- 	<logger name="teetime.framework.signal" level="TRACE" /> -->
 <!-- 	<logger name="teetime.stage" level="TRACE" /> -->
 	<logger name="util" level="INFO" />
-- 
GitLab