From 48fb3a87c7c7d99136d4534e8a0773f43cbe28ed Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Sat, 1 Aug 2015 14:34:05 +0200 Subject: [PATCH] finished new context implementation --- .../framework/A1ThreadableStageCollector.java | 4 +-- .../framework/AbstractCompositeStage.java | 21 ------------- .../teetime/framework/AbstractService.java | 4 --- .../framework/RuntimeServiceFacade.java | 31 ------------------- .../java/teetime/framework/ThreadService.java | 16 +++------- .../java/teetime/framework/Traverser.java | 8 +---- .../dynamic/DynamicDistributorTest.java | 3 +- .../merger/dynamic/DynamicMergerTest.java | 4 +-- 8 files changed, 10 insertions(+), 81 deletions(-) diff --git a/src/main/java/teetime/framework/A1ThreadableStageCollector.java b/src/main/java/teetime/framework/A1ThreadableStageCollector.java index e1e5ccfe..6d3205aa 100644 --- a/src/main/java/teetime/framework/A1ThreadableStageCollector.java +++ b/src/main/java/teetime/framework/A1ThreadableStageCollector.java @@ -15,10 +15,10 @@ public class A1ThreadableStageCollector implements ITraverserVisitor { @Override public VisitorBehavior visit(final Stage stage) { - if (stage.getOwningThread() != null && !threadableStages.contains(stage)) { + if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) { threadableStages.add(stage); } - return VisitorBehavior.CONTINUE; + return stage.getCurrentState() == StageState.CREATED ? VisitorBehavior.CONTINUE : VisitorBehavior.STOP; } @Override diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 93c9870e..674c4d57 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -32,16 +32,6 @@ public abstract class AbstractCompositeStage { */ private static final int DEFAULT_CAPACITY = 4; - // private final ConfigurationContext context; - - public AbstractCompositeStage() { - // this.context = new ConfigurationContext(this); - } - - // ConfigurationContext getContext() { - // return context; - // } - /** * Execute this method, to add a stage to the configuration, which should be executed in a own thread. * @@ -61,7 +51,6 @@ public abstract class AbstractCompositeStage { * A string which can be used for debugging. */ protected void addThreadableStage(final Stage stage, final String threadName) { - // context.addThreadableStage(stage, threadName); AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); Thread newThread = new TeeTimeThread(runnable, threadName); stage.setOwningThread(newThread); @@ -94,26 +83,16 @@ public abstract class AbstractCompositeStage { * the type of elements to be sent */ protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - // context.connectPorts(sourcePort, targetPort, capacity); connectPortsInternal(sourcePort, targetPort, capacity); } private final <T> void connectPortsInternal(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) { - // if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) { if (sourcePort.getOwningStage().getOwningThread() == null) { addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); } } - // if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) { - // LOGGER.warn("Overwriting existing pipe while connecting stages " + - // sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); - // } - - // addChildContext(sourcePort.getOwningStage()); - // addChildContext(targetPort.getOwningStage()); - new InstantiationPipe<T>(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/AbstractService.java b/src/main/java/teetime/framework/AbstractService.java index c103b03c..de7a3844 100644 --- a/src/main/java/teetime/framework/AbstractService.java +++ b/src/main/java/teetime/framework/AbstractService.java @@ -14,14 +14,10 @@ abstract class AbstractService<T> { abstract void onInitialize(); - abstract void onStart(); - abstract void onExecute(); abstract void onTerminate(); abstract void onFinish(); - // abstract void merge(T source); - } diff --git a/src/main/java/teetime/framework/RuntimeServiceFacade.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java index 316fd89b..2670bed1 100644 --- a/src/main/java/teetime/framework/RuntimeServiceFacade.java +++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java @@ -25,36 +25,5 @@ public final class RuntimeServiceFacade { public void startWithinNewThread(final Stage previousStage, final Stage stage) { previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); - - // SignalingCounter runtimeCounter = previousStage.getOwningContext().getThreadService().getRunnableCounter(); - // SignalingCounter newCounter = stage.getOwningContext().getThreadService().getRunnableCounter(); - // runtimeCounter.inc(newCounter); - - // stage.logger.error(stage.owningContext.getThreadService().getRunnableCounter().toString()); - - // !!! stage.owningContext = XXX.owningContext !!! - - // Runnable runnable = AbstractRunnableStage.create(stage); - // Thread thread = new Thread(runnable); - // - // stage.setOwningThread(thread); - // stage.setExceptionHandler(null); - // - // thread.start(); - - // requirements: - // 1. all new threads from stage must be known to the global context - // 2. number of active threads must be increased by the stage - - // if (runnable instanceof RunnableConsumerStage) { - // // do nothing - // } else if (runnable instanceof RunnableProducerStage) { - // ((RunnableProducerStage) runnable).triggerInitializingSignal(); - // ((RunnableProducerStage) runnable).triggerStartingSignal(); - // } else { - // // TODO - // } - - // stage.onSignal(signal, inputPort); } } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 10381022..88189c89 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -39,16 +39,17 @@ class ThreadService extends AbstractService<ThreadService> { @Override void onInitialize() { Stage startStage = configuration.getStartStage(); - initialize(startStage); - onStart(); + Set<Stage> newThreadableStages = initialize(startStage); + startThreads(newThreadableStages); + sendInitializingSignal(newThreadableStages); } void startStageAtRuntime(final Stage newStage) { - Set<Stage> newThreadableStages = initialize(newStage); + configuration.addThreadableStage(newStage); + Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); - sendInitializingSignal(newThreadableStages); sendStartingSignal(newThreadableStages); @@ -128,13 +129,6 @@ class ThreadService extends AbstractService<ThreadService> { } } - @Override - void onStart() { - startThreads(threadableStages); - - sendInitializingSignal(threadableStages); - } - @Override void onExecute() { sendStartingSignal(threadableStages); diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java index f17d0c25..7562b4c5 100644 --- a/src/main/java/teetime/framework/Traverser.java +++ b/src/main/java/teetime/framework/Traverser.java @@ -63,13 +63,7 @@ public class Traverser { public void traverse(final Stage stage) { VisitorBehavior behavior = traverserVisitor.visit(stage); - if (behavior == VisitorBehavior.STOP) { - return; - } - - if (!visitedStages.add(stage)) { - // || stage.getCurrentState() != StageState.CREATED - // do not visit (1) an already visited stage and (2) a stage that currently run (runtime visiting) + if (behavior == VisitorBehavior.STOP || !visitedStages.add(stage)) { return; } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 40c6ad4d..deeb0940 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.junit.Ignore; import org.junit.Test; import teetime.framework.Configuration; @@ -34,7 +33,7 @@ import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.util.framework.port.PortAction; -@Ignore +//@Ignore public class DynamicDistributorTest { @Test diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index d175ae21..5d8cb948 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -22,19 +22,17 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.List; -import org.junit.Ignore; import org.junit.Test; import teetime.framework.Configuration; -import teetime.framework.RuntimeServiceFacade; import teetime.framework.Execution; +import teetime.framework.RuntimeServiceFacade; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.basic.merger.strategy.BusyWaitingRoundRobinStrategy; import teetime.util.framework.port.PortAction; -@Ignore public class DynamicMergerTest { @Test -- GitLab