Skip to content
Snippets Groups Projects
Commit 944c0c85 authored by Christian Wulf's avatar Christian Wulf
Browse files

prepared DynamicActuator

parent a302e1da
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,7 @@
*/
package teetime.framework;
public class DynamicActuator {
/**
......@@ -28,7 +29,11 @@ public class DynamicActuator {
return new RunnableProducerStage(stage);
}
public Runnable startWithinNewThread(final Stage stage) {
public Runnable startWithinNewThread(final Stage previousStage, final Stage stage) {
// SignalingCounter runtimeCounter = previousStage.owningContext.getThreadService().getRunnableCounter();
// SignalingCounter newCounter = stage.owningContext.getThreadService().getRunnableCounter();
// runtimeCounter.inc(newCounter);
Runnable runnable = wrap(stage);
Thread thread = new Thread(runnable);
......@@ -36,6 +41,16 @@ public class DynamicActuator {
stage.setExceptionHandler(null);
thread.start();
if (runnable instanceof RunnableConsumerStage) {
// do nothing
} else if (runnable instanceof RunnableProducerStage) {
((RunnableProducerStage) runnable).triggerInitializingSignal();
((RunnableProducerStage) runnable).triggerStartingSignal();
} else {
// TODO
}
return runnable;
}
}
......@@ -114,10 +114,10 @@ class ThreadService extends AbstractService<ThreadService> {
// Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>();
List<Exception> exceptions = new ArrayList<Exception>();
for (Stage stage : threadableStages.keySet()) {
// List<Exception> stageExceptions = stage.exceptionListener.getExceptions();
// exceptions.addAll(stageExceptions);
}
// for (Stage stage : threadableStages.keySet()) {
// List<Exception> stageExceptions = stage.exceptionListener.getExceptions();
// exceptions.addAll(stageExceptions);
// }
return exceptions;
}
......@@ -207,7 +207,7 @@ class ThreadService extends AbstractService<ThreadService> {
@Override
void merge(final ThreadService source) {
threadableStages.putAll(source.getThreadableStages());
runnableCounter.inc(source.runnableCounter);
// runnableCounter.inc(source.runnableCounter);
}
SignalingCounter getRunnableCounter() {
......
......@@ -43,14 +43,14 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
public void execute(final DynamicDistributor<T> dynamicDistributor) {
OutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort();
processOutputPort(newOutputPort);
processOutputPort(dynamicDistributor, newOutputPort);
onOutputPortCreated(dynamicDistributor, newOutputPort);
}
private void processOutputPort(final OutputPort<T> newOutputPort) {
private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) {
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage());
DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
newOutputPort.sendSignal(new InitializingSignal());
newOutputPort.sendSignal(new StartingSignal());
......
......@@ -27,7 +27,6 @@ import org.junit.Test;
import teetime.framework.Configuration;
import teetime.framework.DynamicActuator;
import teetime.framework.Execution;
import teetime.framework.RunnableProducerStage;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -97,15 +96,12 @@ public class DynamicMergerTest {
private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) {
final InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(number);
final Runnable runnableStage = DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer);
PortAction<DynamicMerger<Integer>> portAction = new CreatePortAction<Integer>(initialElementProducer.getOutputPort()) {
@Override
public void execute(final DynamicMerger<Integer> dynamicDistributor) {
super.execute(dynamicDistributor);
final RunnableProducerStage runnableProducerStage = (RunnableProducerStage) runnableStage;
runnableProducerStage.triggerInitializingSignal();
runnableProducerStage.triggerStartingSignal();
DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, initialElementProducer);
}
};
return portAction;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment