From 96f3df557054200d5c4b28da08a0da5ba61f1306 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 19 Jun 2015 08:05:20 +0200
Subject: [PATCH] refactoring

---
 .../distributor/ControlledDistributor.java    | 20 +---------
 .../basic/distributor/DynamicDistributor.java | 38 +++++++++++++------
 .../DynamicPortActionContainer.java           | 22 +++++++++++
 .../ControlledDistributorTest.java            |  2 -
 4 files changed, 50 insertions(+), 32 deletions(-)

diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
index cb9237f0..fc66d3c8 100644
--- a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java
@@ -9,15 +9,11 @@ import org.jctools.queues.spec.Ordering;
 import org.jctools.queues.spec.Preference;
 
 import teetime.framework.AbstractStage;
-import teetime.framework.DynamicActuator;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
 import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
 import teetime.framework.exceptionHandling.StageException;
-import teetime.framework.pipe.SpScPipeFactory;
-import teetime.framework.signal.InitializingSignal;
-import teetime.framework.signal.StartingSignal;
 import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
 import teetime.util.concurrent.queue.PCBlockingQueue;
 import teetime.util.concurrent.queue.putstrategy.PutStrategy;
@@ -27,8 +23,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
 
 public class ControlledDistributor<T> extends AbstractStage {
 
-	private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
-
 	// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
 	private final InputPort<T> inputPort = createInputPort();
 
@@ -77,8 +71,6 @@ public class ControlledDistributor<T> extends AbstractStage {
 		}
 	}
 
-	private final DynamicActuator dynamicActuator = new DynamicActuator();
-
 	private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
 		System.out.println("" + dynamicPortAction.getDynamicPortAction());
 
@@ -86,17 +78,7 @@ public class ControlledDistributor<T> extends AbstractStage {
 		case CREATE:
 			Distributor<T> distributor = getDistributor(outputPort);
 			OutputPort<T> newOutputPort = distributor.getNewOutputPort();
-			InputPort<T> newInputPort = dynamicPortAction.getInputPort();
-			spScPipeFactory.create(newOutputPort, newInputPort);
-
-			Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage());
-			Thread thread = new Thread(runnable);
-			thread.start();
-
-			newOutputPort.sendSignal(new InitializingSignal());
-			newOutputPort.sendSignal(new StartingSignal());
-
-			// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
+			dynamicPortAction.execute(newOutputPort);
 			break;
 		case REMOVE:
 			// TODO implement "remove port at runtime"
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
index a7e4c9f1..7aeb2cf2 100644
--- a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java
@@ -1,26 +1,44 @@
 package teetime.stage.basic.distributor;
 
-import teetime.framework.InputPort;
+import java.util.Queue;
+
+import org.jctools.queues.QueueFactory;
+import org.jctools.queues.spec.ConcurrentQueueSpec;
+import org.jctools.queues.spec.Ordering;
+import org.jctools.queues.spec.Preference;
+
 import teetime.framework.OutputPort;
-import teetime.framework.pipe.SpScPipeFactory;
 import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
+import teetime.util.concurrent.queue.PCBlockingQueue;
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
+import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
 
 public class DynamicDistributor<T> extends Distributor<T> {
 
-	private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
+	private final PCBlockingQueue<DynamicPortActionContainer<T>> actions;
 
-	@SuppressWarnings("rawtypes")
-	private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class);
+	public DynamicDistributor() {
+		final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+		final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>();
+		final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>();
+		actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy);
+	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	protected void execute(final T element) {
-		DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive();
+		checkForPendingPortActionRequest();
+
+		super.execute(element);
+	}
+
+	private void checkForPendingPortActionRequest() {
+		DynamicPortActionContainer<T> dynamicPortAction = actions.poll();
 		switch (dynamicPortAction.getDynamicPortAction()) {
 		case CREATE:
 			OutputPort<T> newOutputPort = createOutputPort();
-			InputPort<T> newInputPort = dynamicPortAction.getInputPort();
-			spScPipeFactory.create(newOutputPort, newInputPort);
+			dynamicPortAction.execute(newOutputPort);
 			break;
 		case REMOVE:
 			// TODO implement "remove port at runtime"
@@ -31,7 +49,5 @@ public class DynamicDistributor<T> extends Distributor<T> {
 			}
 			break;
 		}
-
-		super.execute(element);
 	}
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
index 57847fd3..d57e9abe 100644
--- a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
+++ b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java
@@ -1,9 +1,18 @@
 package teetime.stage.basic.distributor;
 
+import teetime.framework.DynamicActuator;
 import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.pipe.SpScPipeFactory;
+import teetime.framework.signal.InitializingSignal;
+import teetime.framework.signal.StartingSignal;
 
 public class DynamicPortActionContainer<T> {
 
+	private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
+
+	private final DynamicActuator dynamicActuator = new DynamicActuator();
+
 	public enum DynamicPortAction {
 		CREATE, REMOVE;
 	}
@@ -25,4 +34,17 @@ public class DynamicPortActionContainer<T> {
 		return inputPort;
 	}
 
+	public void execute(final OutputPort<T> newOutputPort) {
+		INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
+
+		Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage());
+		Thread thread = new Thread(runnable);
+		thread.start();
+
+		newOutputPort.sendSignal(new InitializingSignal());
+		newOutputPort.sendSignal(new StartingSignal());
+
+		// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
+	}
+
 }
diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
index 69a537b5..49e9e88d 100644
--- a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
+++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java
@@ -13,7 +13,6 @@ import org.junit.Test;
 
 import teetime.framework.Analysis;
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.DynamicActuator;
 import teetime.framework.Stage;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
@@ -22,7 +21,6 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct
 public class ControlledDistributorTest {
 
 	// private ControlledDistributor<Integer> controlledDistributor;
-	private final DynamicActuator dynamicActuator = new DynamicActuator();
 
 	@Before
 	public void setUp() throws Exception {
-- 
GitLab