diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java index cb9237f0a7f367d4fbc3321279449d7e39f078c6..fc66d3c8d9aa441a8020328e0575513415816091 100644 --- a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java @@ -9,15 +9,11 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.AbstractStage; -import teetime.framework.DynamicActuator; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.StageException; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.signal.InitializingSignal; -import teetime.framework.signal.StartingSignal; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.putstrategy.PutStrategy; @@ -27,8 +23,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy; public class ControlledDistributor<T> extends AbstractStage { - private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); - // private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort(); private final InputPort<T> inputPort = createInputPort(); @@ -77,8 +71,6 @@ public class ControlledDistributor<T> extends AbstractStage { } } - private final DynamicActuator dynamicActuator = new DynamicActuator(); - private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) { System.out.println("" + dynamicPortAction.getDynamicPortAction()); @@ -86,17 +78,7 @@ public class ControlledDistributor<T> extends AbstractStage { case CREATE: Distributor<T> distributor = getDistributor(outputPort); OutputPort<T> newOutputPort = distributor.getNewOutputPort(); - InputPort<T> newInputPort = dynamicPortAction.getInputPort(); - spScPipeFactory.create(newOutputPort, newInputPort); - - Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage()); - Thread thread = new Thread(runnable); - thread.start(); - - newOutputPort.sendSignal(new InitializingSignal()); - newOutputPort.sendSignal(new StartingSignal()); - - // FIXME pass the new thread to the analysis so that it can terminate the thread at the end + dynamicPortAction.execute(newOutputPort); break; case REMOVE: // TODO implement "remove port at runtime" diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java index a7e4c9f12cbad063cfda8e65b7b64bfbd440de85..7aeb2cf20c469956d6d1fde59a62c47ee09f4153 100644 --- a/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/DynamicDistributor.java @@ -1,26 +1,44 @@ package teetime.stage.basic.distributor; -import teetime.framework.InputPort; +import java.util.Queue; + +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; + import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipeFactory; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; +import teetime.util.concurrent.queue.PCBlockingQueue; +import teetime.util.concurrent.queue.putstrategy.PutStrategy; +import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; +import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy; +import teetime.util.concurrent.queue.takestrategy.TakeStrategy; public class DynamicDistributor<T> extends Distributor<T> { - private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); + private final PCBlockingQueue<DynamicPortActionContainer<T>> actions; - @SuppressWarnings("rawtypes") - private final InputPort<DynamicPortActionContainer> dynamicPortActionInputPort = createInputPort(DynamicPortActionContainer.class); + public DynamicDistributor() { + final Queue<DynamicPortActionContainer<T>> localQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); + final PutStrategy<DynamicPortActionContainer<T>> putStrategy = new YieldPutStrategy<DynamicPortActionContainer<T>>(); + final TakeStrategy<DynamicPortActionContainer<T>> takeStrategy = new SCParkTakeStrategy<DynamicPortActionContainer<T>>(); + actions = new PCBlockingQueue<DynamicPortActionContainer<T>>(localQueue, putStrategy, takeStrategy); + } - @SuppressWarnings("unchecked") @Override protected void execute(final T element) { - DynamicPortActionContainer<T> dynamicPortAction = dynamicPortActionInputPort.receive(); + checkForPendingPortActionRequest(); + + super.execute(element); + } + + private void checkForPendingPortActionRequest() { + DynamicPortActionContainer<T> dynamicPortAction = actions.poll(); switch (dynamicPortAction.getDynamicPortAction()) { case CREATE: OutputPort<T> newOutputPort = createOutputPort(); - InputPort<T> newInputPort = dynamicPortAction.getInputPort(); - spScPipeFactory.create(newOutputPort, newInputPort); + dynamicPortAction.execute(newOutputPort); break; case REMOVE: // TODO implement "remove port at runtime" @@ -31,7 +49,5 @@ public class DynamicDistributor<T> extends Distributor<T> { } break; } - - super.execute(element); } } diff --git a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java index 57847fd31ba7822b6fb2acfdd1d42f7cba009646..d57e9abee2345b500cbeedbdf195799b45310d53 100644 --- a/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java +++ b/src/main/java/teetime/stage/basic/distributor/DynamicPortActionContainer.java @@ -1,9 +1,18 @@ package teetime.stage.basic.distributor; +import teetime.framework.DynamicActuator; import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.signal.InitializingSignal; +import teetime.framework.signal.StartingSignal; public class DynamicPortActionContainer<T> { + private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); + + private final DynamicActuator dynamicActuator = new DynamicActuator(); + public enum DynamicPortAction { CREATE, REMOVE; } @@ -25,4 +34,17 @@ public class DynamicPortActionContainer<T> { return inputPort; } + public void execute(final OutputPort<T> newOutputPort) { + INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); + + Runnable runnable = dynamicActuator.wrap(inputPort.getOwningStage()); + Thread thread = new Thread(runnable); + thread.start(); + + newOutputPort.sendSignal(new InitializingSignal()); + newOutputPort.sendSignal(new StartingSignal()); + + // FIXME pass the new thread to the analysis so that it can terminate the thread at the end + } + } diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java index 69a537b52a63a28ce6da7ab48f6a2c60551ece96..49e9e88da0db8f537dfc964c99fdcac3caed5a59 100644 --- a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java @@ -13,7 +13,6 @@ import org.junit.Test; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; -import teetime.framework.DynamicActuator; import teetime.framework.Stage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -22,7 +21,6 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct public class ControlledDistributorTest { // private ControlledDistributor<Integer> controlledDistributor; - private final DynamicActuator dynamicActuator = new DynamicActuator(); @Before public void setUp() throws Exception {