Skip to content
Snippets Groups Projects
Commit 19c75489 authored by Christian Wulf's avatar Christian Wulf
Browse files

completed CREATE action for dynamic distributor

parent 11e116df
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference; ...@@ -25,6 +25,7 @@ import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
...@@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
} }
@Override @Override
public final void waitForStartSignal() throws InterruptedException { public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName());
}
cachedTargetStage.onSignal(signal, getTargetPort()); cachedTargetStage.onSignal(signal, getTargetPort());
} }
@Override @Override
public final void waitForInitializingSignal() throws InterruptedException { public final void waitForStartSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) { if (!(signal instanceof StartingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal"); throw new IllegalStateException("Expected StartingSignal, but was " + signal.getClass().getSimpleName());
} }
cachedTargetStage.onSignal(signal, getTargetPort()); cachedTargetStage.onSignal(signal, getTargetPort());
} }
......
package teetime.framework;
public class DynamicActuator {
public Runnable wrap(final Stage stage) {
return new RunnableConsumerStage(stage);
}
}
...@@ -9,13 +9,15 @@ import org.jctools.queues.spec.Ordering; ...@@ -9,13 +9,15 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractStage; import teetime.framework.AbstractStage;
import teetime.framework.DynamicActuator;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException; import teetime.framework.exceptionHandling.StageException;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction; import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAction;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.PutStrategy;
...@@ -26,7 +28,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy; ...@@ -26,7 +28,6 @@ import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
public class ControlledDistributor<T> extends AbstractStage { public class ControlledDistributor<T> extends AbstractStage {
private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory(); private static final SpScPipeFactory spScPipeFactory = new SpScPipeFactory();
private static final SingleElementPipeFactory intraPipeFactory = new SingleElementPipeFactory();
// private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort(); // private final InputPort<DynamicPortActionContainer<T>> dynamicPortActionInputPort = createInputPort();
private final InputPort<T> inputPort = createInputPort(); private final InputPort<T> inputPort = createInputPort();
...@@ -76,6 +77,8 @@ public class ControlledDistributor<T> extends AbstractStage { ...@@ -76,6 +77,8 @@ public class ControlledDistributor<T> extends AbstractStage {
} }
} }
private final DynamicActuator dynamicActuator = new DynamicActuator();
private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) { private void checkForOutputPortChange(final DynamicPortActionContainer<T> dynamicPortAction) {
System.out.println("" + dynamicPortAction.getDynamicPortAction()); System.out.println("" + dynamicPortAction.getDynamicPortAction());
...@@ -84,8 +87,16 @@ public class ControlledDistributor<T> extends AbstractStage { ...@@ -84,8 +87,16 @@ public class ControlledDistributor<T> extends AbstractStage {
Distributor<T> distributor = getDistributor(outputPort); Distributor<T> distributor = getDistributor(outputPort);
OutputPort<T> newOutputPort = distributor.getNewOutputPort(); OutputPort<T> newOutputPort = distributor.getNewOutputPort();
InputPort<T> newInputPort = dynamicPortAction.getInputPort(); InputPort<T> newInputPort = dynamicPortAction.getInputPort();
// spScPipeFactory.create(newOutputPort, newInputPort); spScPipeFactory.create(newOutputPort, newInputPort);
intraPipeFactory.create(newOutputPort, newInputPort); // FIXME should be inter, but requires sending init and start signal
Runnable runnable = dynamicActuator.wrap(newInputPort.getOwningStage());
Thread thread = new Thread(runnable);
thread.start();
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal());
// FIXME pass the new thread to the analysis so that it can terminate the thread at the end
break; break;
case REMOVE: case REMOVE:
// TODO implement "remove port at runtime" // TODO implement "remove port at runtime"
......
...@@ -13,6 +13,7 @@ import org.junit.Test; ...@@ -13,6 +13,7 @@ import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Analysis;
import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisConfiguration;
import teetime.framework.DynamicActuator;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
...@@ -21,6 +22,7 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct ...@@ -21,6 +22,7 @@ import teetime.stage.basic.distributor.DynamicPortActionContainer.DynamicPortAct
public class ControlledDistributorTest { public class ControlledDistributorTest {
// private ControlledDistributor<Integer> controlledDistributor; // private ControlledDistributor<Integer> controlledDistributor;
private final DynamicActuator dynamicActuator = new DynamicActuator();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -51,8 +53,14 @@ public class ControlledDistributorTest { ...@@ -51,8 +53,14 @@ public class ControlledDistributorTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5]; DynamicPortActionContainer<Integer>[] inputActions = new DynamicPortActionContainer[5];
for (int i = 0; i < inputActions.length; i++) { for (int i = 0; i < inputActions.length; i++) {
CollectorSink<Integer> newStage = new CollectorSink<Integer>();
// Runnable runnable = dynamicActuator.wrap(newStage);
// Thread thread = new Thread(runnable);
// thread.start();
DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>( DynamicPortActionContainer<Integer> createAction = new DynamicPortActionContainer<Integer>(
DynamicPortAction.CREATE, new CollectorSink<Integer>().getInputPort()); DynamicPortAction.CREATE, newStage.getInputPort());
inputActions[i] = createAction; inputActions[i] = createAction;
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
</appender> </appender>
<logger name="teetime" level="INFO" /> <logger name="teetime" level="INFO" />
<!-- <logger name="teetime.framework" level="TRACE" /> --> <logger name="teetime.framework" level="TRACE" />
<!-- <logger name="teetime.framework.signal" level="TRACE" /> --> <!-- <logger name="teetime.framework.signal" level="TRACE" /> -->
<!-- <logger name="teetime.stage" level="TRACE" /> --> <!-- <logger name="teetime.stage" level="TRACE" /> -->
<logger name="util" level="INFO" /> <logger name="util" level="INFO" />
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment