diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/DynamicActuator.java index a50814d2993d7e6decf5c6bb6937664004ac51d2..2cb2423ca30962a9940094ab9cb36093e0ef3f44 100644 --- a/src/main/java/teetime/framework/DynamicActuator.java +++ b/src/main/java/teetime/framework/DynamicActuator.java @@ -15,6 +15,7 @@ */ package teetime.framework; + public class DynamicActuator { /** @@ -28,7 +29,11 @@ public class DynamicActuator { return new RunnableProducerStage(stage); } - public Runnable startWithinNewThread(final Stage stage) { + public Runnable startWithinNewThread(final Stage previousStage, final Stage stage) { + // SignalingCounter runtimeCounter = previousStage.owningContext.getThreadService().getRunnableCounter(); + // SignalingCounter newCounter = stage.owningContext.getThreadService().getRunnableCounter(); + // runtimeCounter.inc(newCounter); + Runnable runnable = wrap(stage); Thread thread = new Thread(runnable); @@ -36,6 +41,16 @@ public class DynamicActuator { stage.setExceptionHandler(null); thread.start(); + + if (runnable instanceof RunnableConsumerStage) { + // do nothing + } else if (runnable instanceof RunnableProducerStage) { + ((RunnableProducerStage) runnable).triggerInitializingSignal(); + ((RunnableProducerStage) runnable).triggerStartingSignal(); + } else { + // TODO + } + return runnable; } } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 913d15bbd7642354677efa58dd4a9c65b9aa6667..7c76df6fce9d0987076241f6bd325bd1df74bd5f 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -114,10 +114,10 @@ class ThreadService extends AbstractService<ThreadService> { // Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>(); List<Exception> exceptions = new ArrayList<Exception>(); - for (Stage stage : threadableStages.keySet()) { - // List<Exception> stageExceptions = stage.exceptionListener.getExceptions(); - // exceptions.addAll(stageExceptions); - } + // for (Stage stage : threadableStages.keySet()) { + // List<Exception> stageExceptions = stage.exceptionListener.getExceptions(); + // exceptions.addAll(stageExceptions); + // } return exceptions; } @@ -207,7 +207,7 @@ class ThreadService extends AbstractService<ThreadService> { @Override void merge(final ThreadService source) { threadableStages.putAll(source.getThreadableStages()); - runnableCounter.inc(source.runnableCounter); + // runnableCounter.inc(source.runnableCounter); } SignalingCounter getRunnableCounter() { diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 92c3db23c3708b9137903eaa1c44305df4a7666b..4f76d85b1e0f645ef51097cccc7a155ee1a6cada 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -43,14 +43,14 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { public void execute(final DynamicDistributor<T> dynamicDistributor) { OutputPort<T> newOutputPort = dynamicDistributor.getNewOutputPort(); - processOutputPort(newOutputPort); + processOutputPort(dynamicDistributor, newOutputPort); onOutputPortCreated(dynamicDistributor, newOutputPort); } - private void processOutputPort(final OutputPort<T> newOutputPort) { + private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); - DYNAMIC_ACTUATOR.startWithinNewThread(inputPort.getOwningStage()); + DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); newOutputPort.sendSignal(new InitializingSignal()); newOutputPort.sendSignal(new StartingSignal()); diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 64b7d4f80fd101c942734144088fe22a1c1e560a..c79eaeb22be90ef9d39fa21de76c80ca38124cfe 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import teetime.framework.Configuration; import teetime.framework.DynamicActuator; import teetime.framework.Execution; -import teetime.framework.RunnableProducerStage; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -97,15 +96,12 @@ public class DynamicMergerTest { private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) { final InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(number); - final Runnable runnableStage = DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer); PortAction<DynamicMerger<Integer>> portAction = new CreatePortAction<Integer>(initialElementProducer.getOutputPort()) { @Override public void execute(final DynamicMerger<Integer> dynamicDistributor) { super.execute(dynamicDistributor); - final RunnableProducerStage runnableProducerStage = (RunnableProducerStage) runnableStage; - runnableProducerStage.triggerInitializingSignal(); - runnableProducerStage.triggerStartingSignal(); + DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, initialElementProducer); } }; return portAction;