diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index acb1752ef08afe1795d079ee994326d5c91ff555..9ed1d4c9c351d1b04636f54229bf97e45d01789e 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference; import teetime.framework.signal.ISignal; import teetime.framework.signal.InitializingSignal; +import teetime.framework.signal.StartingSignal; import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; @@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { } @Override - public final void waitForStartSignal() throws InterruptedException { + public final void waitForInitializingSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); + if (!(signal instanceof InitializingSignal)) { + throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName()); + } cachedTargetStage.onSignal(signal, getTargetPort()); } @Override - public final void waitForInitializingSignal() throws InterruptedException { + public final void waitForStartSignal() throws InterruptedException { final ISignal signal = signalQueue.take(); - if (!(signal instanceof InitializingSignal)) { - throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal"); + if (!(signal instanceof StartingSignal)) { + throw new IllegalStateException("Expected StartingSignal, but was " + signal.getClass().getSimpleName()); } cachedTargetStage.onSignal(signal, getTargetPort()); } diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java new file mode 100644 index 0000000000000000000000000000000000000000..c53abdd4523874eeb75baf7d74a376316c07fe06 --- /dev/null +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -0,0 +1,8 @@ +package teetime.framework; + +public class DynamicActuator { + + public Runnable wrap(final Stage stage) { + return new RunnableConsumerStage(stage); + } +} diff --git a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java index 84ad9b213693c072cac39a6586a1dea4bb512913..cb9237f0a7f367d4fbc3321279449d7e39f078c6 100644 --- a/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java +++ b/src/main/java/teetime/stage/basic/distributor/ControlledDistributor.java @@ -9,13 +9,15 @@ import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; import teetime.framework.AbstractStage; +import teetime.framework.DynamicActuator; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.StageException; -import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.signal.InitializingSignal; +import teetime.framework.signal.StartingSignal; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.putstrategy.PutStrategy; @@ -26,7 +28,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy; public class ControlledDistributor<T> extends AbstractStage { private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); - private static final SingleElementPipeFactory intraPipeFactory = new SingleElementPipeFactory(); // private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort(); private final InputPort<T> inputPort = createInputPort(); @@ -76,6 +77,8 @@ public class ControlledDistributor<T> extends AbstractStage { } } + private final DynamicActuator dynamicActuator = new DynamicActuator(); + private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) { System.out.println("" + dynamicPortAction.getDynamicPortAction()); @@ -84,8 +87,16 @@ public class ControlledDistributor<T> extends AbstractStage { Distributor<T> distributor = getDistributor(outputPort); OutputPort<T> newOutputPort = distributor.getNewOutputPort(); InputPort<T> newInputPort = dynamicPortAction.getInputPort(); - // spScPipeFactory.create(newOutputPort, newInputPort); - intraPipeFactory.create(newOutputPort, newInputPort); // FIXME should be inter, but requires sending init and start signal + spScPipeFactory.create(newOutputPort, newInputPort); + + Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage()); + Thread thread = new Thread(runnable); + thread.start(); + + newOutputPort.sendSignal(new InitializingSignal()); + newOutputPort.sendSignal(new StartingSignal()); + + // FIXME pass the new thread to the analysis so that it can terminate the thread at the end break; case REMOVE: // TODO implement "remove port at runtime" diff --git a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java index 42b13abb2d623ad3fdf2ddf54d7e3b9a27fa3fde..69a537b52a63a28ce6da7ab48f6a2c60551ece96 100644 --- a/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/ControlledDistributorTest.java @@ -13,6 +13,7 @@ import org.junit.Test; import teetime.framework.Analysis; import teetime.framework.AnalysisConfiguration; +import teetime.framework.DynamicActuator; import teetime.framework.Stage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -21,6 +22,7 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct public class ControlledDistributorTest { // private ControlledDistributor<Integer> controlledDistributor; + private final DynamicActuator dynamicActuator = new DynamicActuator(); @Before public void setUp() throws Exception { @@ -51,8 +53,14 @@ public class ControlledDistributorTest { @SuppressWarnings("unchecked") DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5]; for (int i = 0; i < inputActions.length; i++) { + CollectorSink<Integer> newStage = new CollectorSink<Integer>(); + + // Runnable runnable = dynamicActuator.wrap(newStage); + // Thread thread = new Thread(runnable); + // thread.start(); + DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>( - DynamicPortAction.CREATE, new CollectorSink<Integer>().getInputPort()); + DynamicPortAction.CREATE, newStage.getInputPort()); inputActions[i] = createAction; } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 19f06bd8c3cddbf8ea814766c4a841478ca9fb03..45736549e35e9f1a1bc05e9ec7307d8887a4fabb 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -21,7 +21,7 @@ </appender> <logger name="teetime" level="INFO" /> -<!-- <logger name="teetime.framework" level="TRACE" /> --> + <logger name="teetime.framework" level="TRACE" /> <!-- <logger name="teetime.framework.signal" level="TRACE" /> --> <!-- <logger name="teetime.stage" level="TRACE" /> --> <logger name="util" level="INFO" />