From 0d5f73b81f2943c13fd30afa2f80068934e97b55 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sat, 1 Aug 2015 14:18:56 +0200
Subject: [PATCH] refactored Traverser

---
 .../teetime/framework/A0UnconnectedPort.java  | 12 ++-
 .../framework/A1ThreadableStageCollector.java | 24 ++----
 .../A2InvalidThreadAssignmentCheck.java       | 13 ++-
 .../framework/A3PipeInstantiation.java        | 11 ++-
 .../framework/A4StageAttributeSetter.java     |  4 +-
 .../framework/ConfigurationContext.java       | 55 +-----------
 .../java/teetime/framework/IPipeVisitor.java  | 28 -------
 .../java/teetime/framework/IPortVisitor.java  |  7 --
 .../teetime/framework/ITraverserVisitor.java  | 11 +++
 .../framework/IntraStageCollector.java        | 21 +++--
 ...ctuator.java => RuntimeServiceFacade.java} | 32 ++++---
 .../java/teetime/framework/ThreadService.java | 83 ++++++++++---------
 .../java/teetime/framework/Traverser.java     | 54 ++++++------
 .../teetime/framework/pipe/DummyPipe.java     |  8 +-
 .../distributor/dynamic/CreatePortAction.java |  5 +-
 .../java/teetime/framework/TraverserTest.java |  8 +-
 .../merger/dynamic/DynamicMergerTest.java     |  6 +-
 src/test/resources/data/output.txt            | 11 +++
 18 files changed, 187 insertions(+), 206 deletions(-)
 delete mode 100644 src/main/java/teetime/framework/IPipeVisitor.java
 delete mode 100644 src/main/java/teetime/framework/IPortVisitor.java
 create mode 100644 src/main/java/teetime/framework/ITraverserVisitor.java
 rename src/main/java/teetime/framework/{DynamicActuator.java => RuntimeServiceFacade.java} (65%)

diff --git a/src/main/java/teetime/framework/A0UnconnectedPort.java b/src/main/java/teetime/framework/A0UnconnectedPort.java
index 86ef46d1..39206bef 100644
--- a/src/main/java/teetime/framework/A0UnconnectedPort.java
+++ b/src/main/java/teetime/framework/A0UnconnectedPort.java
@@ -3,19 +3,27 @@ package teetime.framework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import teetime.framework.Traverser.VisitorBehavior;
 import teetime.framework.pipe.DummyPipe;
 
-public class A0UnconnectedPort implements IPortVisitor {
+public class A0UnconnectedPort implements ITraverserVisitor {
 
 	private static final Logger LOGGER = LoggerFactory.getLogger(A0UnconnectedPort.class);
 
 	@Override
-	public void visit(final AbstractPort<?> port) {
+	public VisitorBehavior visit(final Stage stage) {
+		return VisitorBehavior.CONTINUE;
+	}
+
+	@Override
+	public VisitorBehavior visit(final AbstractPort<?> port) {
 		if (port.getPipe() == null) {
 			if (LOGGER.isInfoEnabled()) {
 				LOGGER.info("Unconnected output port: " + port + ". Connecting with a dummy output port.");
 			}
 			port.setPipe(DummyPipe.INSTANCE);
+			return VisitorBehavior.STOP;
 		}
+		return VisitorBehavior.CONTINUE;
 	}
 }
diff --git a/src/main/java/teetime/framework/A1ThreadableStageCollector.java b/src/main/java/teetime/framework/A1ThreadableStageCollector.java
index d19cb065..e1e5ccfe 100644
--- a/src/main/java/teetime/framework/A1ThreadableStageCollector.java
+++ b/src/main/java/teetime/framework/A1ThreadableStageCollector.java
@@ -3,33 +3,27 @@ package teetime.framework;
 import java.util.HashSet;
 import java.util.Set;
 
-import teetime.framework.pipe.IPipe;
+import teetime.framework.Traverser.VisitorBehavior;
 
-public class A1ThreadableStageCollector implements IPipeVisitor {
+public class A1ThreadableStageCollector implements ITraverserVisitor {
 
 	private final Set<Stage> threadableStages = new HashSet<Stage>();
-	private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>();
 
 	public Set<Stage> getThreadableStages() {
 		return threadableStages;
 	}
 
 	@Override
-	public VisitorBehavior visit(final IPipe<?> pipe) {
-		if (visitedPipes.contains(pipe)) {
-			return VisitorBehavior.STOP;
+	public VisitorBehavior visit(final Stage stage) {
+		if (stage.getOwningThread() != null && !threadableStages.contains(stage)) {
+			threadableStages.add(stage);
 		}
-		visitedPipes.add(pipe);
-
-		collectThreadableStage(pipe.getSourcePort().getOwningStage());
-		collectThreadableStage(pipe.getTargetPort().getOwningStage());
-
 		return VisitorBehavior.CONTINUE;
 	}
 
-	private void collectThreadableStage(final Stage stage) {
-		if (stage.getOwningThread() != null && !threadableStages.contains(stage)) {
-			threadableStages.add(stage);
-		}
+	@Override
+	public VisitorBehavior visit(final AbstractPort<?> port) {
+		return VisitorBehavior.CONTINUE;
 	}
+
 }
diff --git a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java
index c04006f8..84624943 100644
--- a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java
+++ b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java
@@ -2,6 +2,7 @@ package teetime.framework;
 
 import java.util.Set;
 
+import teetime.framework.Traverser.VisitorBehavior;
 import teetime.framework.pipe.IPipe;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
@@ -31,7 +32,7 @@ public class A2InvalidThreadAssignmentCheck {
 		}
 	}
 
-	private static class ThreadPainter implements IPipeVisitor {
+	private static class ThreadPainter implements ITraverserVisitor {
 
 		private final ObjectIntMap<Stage> colors;
 		private final int color;
@@ -45,8 +46,16 @@ public class A2InvalidThreadAssignmentCheck {
 		}
 
 		@Override
-		public VisitorBehavior visit(final IPipe<?> pipe) {
+		public VisitorBehavior visit(final Stage stage) {
+			return VisitorBehavior.CONTINUE;
+		}
+
+		@Override
+		public VisitorBehavior visit(final AbstractPort<?> port) {
+			IPipe<?> pipe = port.getPipe();
+			// FIXME line below requires FORWARD. should be independent of the used direction
 			Stage targetStage = pipe.getTargetPort().getOwningStage();
+
 			int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR;
 
 			if (threadableStages.contains(targetStage) && targetColor != color) {
diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java
index 1e1e38c4..18250c23 100644
--- a/src/main/java/teetime/framework/A3PipeInstantiation.java
+++ b/src/main/java/teetime/framework/A3PipeInstantiation.java
@@ -3,6 +3,7 @@ package teetime.framework;
 import java.util.HashSet;
 import java.util.Set;
 
+import teetime.framework.Traverser.VisitorBehavior;
 import teetime.framework.pipe.IPipe;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.InstantiationPipe;
@@ -10,7 +11,7 @@ import teetime.framework.pipe.SingleElementPipeFactory;
 import teetime.framework.pipe.SpScPipeFactory;
 import teetime.framework.pipe.UnboundedSpScPipeFactory;
 
-public class A3PipeInstantiation implements IPipeVisitor {
+public class A3PipeInstantiation implements ITraverserVisitor {
 
 	private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
 	private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
@@ -19,7 +20,13 @@ public class A3PipeInstantiation implements IPipeVisitor {
 	private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>();
 
 	@Override
-	public VisitorBehavior visit(final IPipe<?> pipe) {
+	public VisitorBehavior visit(final Stage stage) {
+		return VisitorBehavior.CONTINUE;
+	}
+
+	@Override
+	public VisitorBehavior visit(final AbstractPort<?> port) {
+		IPipe<?> pipe = port.getPipe();
 		if (visitedPipes.contains(pipe)) {
 			return VisitorBehavior.STOP;
 		}
diff --git a/src/main/java/teetime/framework/A4StageAttributeSetter.java b/src/main/java/teetime/framework/A4StageAttributeSetter.java
index 613d69ef..60a3a54e 100644
--- a/src/main/java/teetime/framework/A4StageAttributeSetter.java
+++ b/src/main/java/teetime/framework/A4StageAttributeSetter.java
@@ -20,8 +20,8 @@ public class A4StageAttributeSetter {
 	}
 
 	private void setAttributes(final Stage threadableStage) {
-		IPipeVisitor pipeVisitor = new IntraStageCollector();
-		Traverser traverser = new Traverser(pipeVisitor);
+		IntraStageCollector visitor = new IntraStageCollector(threadableStage);
+		Traverser traverser = new Traverser(visitor);
 		traverser.traverse(threadableStage);
 
 		setAttributes(threadableStage, traverser.getVisitedStages());
diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java
index 38c9d9cc..8c356665 100644
--- a/src/main/java/teetime/framework/ConfigurationContext.java
+++ b/src/main/java/teetime/framework/ConfigurationContext.java
@@ -41,63 +41,10 @@ final class ConfigurationContext {
 		return threadService.getThreadableStages();
 	}
 
-	/**
-	 * @see AbstractCompositeStage#addThreadableStage(Stage)
-	 */
-	// final void addThreadableStage(final Stage stage, final String threadName) {
-	// addChildContext(stage);
-	// threadService.addThreadableStage(stage, threadName);
-	// }
-
-	/**
-	 * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int)
-	 */
-	// final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-	// if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
-	// if (!threadService.getThreadableStages().containsKey(sourcePort.getOwningStage())) {
-	// addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
-	// }
-	// }
-	//
-	// if (LOGGER.isWarnEnabled() && (sourcePort.getPipe() != null || targetPort.getPipe() != null)) {
-	// LOGGER.warn("Overwriting existing pipe while connecting stages " +
-	// sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + ".");
-	// }
-	//
-	// addChildContext(sourcePort.getOwningStage());
-	// addChildContext(targetPort.getOwningStage());
-	//
-	// new InstantiationPipe(sourcePort, targetPort, capacity);
-	// }
-
-	// final void addChildContext(final Stage stage) {
-	// if (!stage.owningContext.equals(EMPTY_CONTEXT)) {
-	// if (stage.owningContext != this) { // Performance
-	// children.add(stage.owningContext);
-	// }
-	// } else {
-	// stage.owningContext = this;
-	// }
-	// }
-	//
-	// final void initializeContext() {
-	// for (ConfigurationContext child : children) {
-	// child.initializeContext();
-	// mergeContexts(child);
-	// }
-	// }
-
-	final void initializeServices() {
+	void initializeServices() {
 		threadService.onInitialize();
 	}
 
-	// private void mergeContexts(final ConfigurationContext child) {
-	// threadService.merge(child.getThreadService());
-	//
-	// // Finally copy parent services
-	// child.threadService = this.threadService;
-	// }
-
 	void executeConfiguration() {
 		this.threadService.onExecute();
 	}
diff --git a/src/main/java/teetime/framework/IPipeVisitor.java b/src/main/java/teetime/framework/IPipeVisitor.java
deleted file mode 100644
index 9b884a7a..00000000
--- a/src/main/java/teetime/framework/IPipeVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package teetime.framework;
-
-import teetime.framework.pipe.IPipe;
-
-public interface IPipeVisitor {
-
-	public enum VisitorBehavior {
-		CONTINUE, STOP
-	}
-
-	VisitorBehavior visit(IPipe<?> pipe);
-
-}
diff --git a/src/main/java/teetime/framework/IPortVisitor.java b/src/main/java/teetime/framework/IPortVisitor.java
deleted file mode 100644
index 28f314b3..00000000
--- a/src/main/java/teetime/framework/IPortVisitor.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package teetime.framework;
-
-public interface IPortVisitor {
-
-	void visit(AbstractPort<?> port);
-
-}
diff --git a/src/main/java/teetime/framework/ITraverserVisitor.java b/src/main/java/teetime/framework/ITraverserVisitor.java
new file mode 100644
index 00000000..9f2c078d
--- /dev/null
+++ b/src/main/java/teetime/framework/ITraverserVisitor.java
@@ -0,0 +1,11 @@
+package teetime.framework;
+
+import teetime.framework.Traverser.VisitorBehavior;
+
+public interface ITraverserVisitor {
+
+	VisitorBehavior visit(Stage stage);
+
+	VisitorBehavior visit(AbstractPort<?> port);
+
+}
diff --git a/src/main/java/teetime/framework/IntraStageCollector.java b/src/main/java/teetime/framework/IntraStageCollector.java
index cd0fbede..40ccaa5b 100644
--- a/src/main/java/teetime/framework/IntraStageCollector.java
+++ b/src/main/java/teetime/framework/IntraStageCollector.java
@@ -15,18 +15,29 @@
  */
 package teetime.framework;
 
-import teetime.framework.pipe.IPipe;
+import teetime.framework.Traverser.VisitorBehavior;
 
-public class IntraStageCollector implements IPipeVisitor {
+public class IntraStageCollector implements ITraverserVisitor {
 
-	public IntraStageCollector() {}
+	private final Stage startStage;
+
+	public IntraStageCollector(final Stage startStage) {
+		super();
+		this.startStage = startStage;
+	}
 
 	@Override
-	public VisitorBehavior visit(final IPipe<?> outputPipe) {
-		if (outputPipe instanceof AbstractIntraThreadPipe) {
+	public VisitorBehavior visit(final Stage stage) {
+		if (stage == startStage || stage.getOwningThread() == null /* before execution */
+				|| stage.getOwningThread() == startStage.getOwningThread() /* while execution */) {
 			return VisitorBehavior.CONTINUE;
 		}
 		return VisitorBehavior.STOP;
 	}
 
+	@Override
+	public VisitorBehavior visit(final AbstractPort<?> port) {
+		return VisitorBehavior.CONTINUE;
+	}
+
 }
diff --git a/src/main/java/teetime/framework/DynamicActuator.java b/src/main/java/teetime/framework/RuntimeServiceFacade.java
similarity index 65%
rename from src/main/java/teetime/framework/DynamicActuator.java
rename to src/main/java/teetime/framework/RuntimeServiceFacade.java
index 060ba464..316fd89b 100644
--- a/src/main/java/teetime/framework/DynamicActuator.java
+++ b/src/main/java/teetime/framework/RuntimeServiceFacade.java
@@ -15,10 +15,16 @@
  */
 package teetime.framework;
 
-public class DynamicActuator {
+public final class RuntimeServiceFacade {
 
-	public Runnable startWithinNewThread(final Stage previousStage, final Stage stage) {
-		previousStage.getOwningContext().getThreadService().onInitialize();
+	public static final RuntimeServiceFacade INSTANCE = new RuntimeServiceFacade();
+
+	private RuntimeServiceFacade() {
+		// singleton
+	}
+
+	public void startWithinNewThread(final Stage previousStage, final Stage stage) {
+		previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage);
 
 		// SignalingCounter runtimeCounter = previousStage.getOwningContext().getThreadService().getRunnableCounter();
 		// SignalingCounter newCounter = stage.getOwningContext().getThreadService().getRunnableCounter();
@@ -28,7 +34,7 @@ public class DynamicActuator {
 
 		// !!! stage.owningContext = XXX.owningContext !!!
 
-		Runnable runnable = AbstractRunnableStage.create(stage);
+		// Runnable runnable = AbstractRunnableStage.create(stage);
 		// Thread thread = new Thread(runnable);
 		//
 		// stage.setOwningThread(thread);
@@ -40,15 +46,15 @@ public class DynamicActuator {
 		// 1. all new threads from stage must be known to the global context
 		// 2. number of active threads must be increased by the stage
 
-		if (runnable instanceof RunnableConsumerStage) {
-			// do nothing
-		} else if (runnable instanceof RunnableProducerStage) {
-			((RunnableProducerStage) runnable).triggerInitializingSignal();
-			((RunnableProducerStage) runnable).triggerStartingSignal();
-		} else {
-			// TODO
-		}
+		// if (runnable instanceof RunnableConsumerStage) {
+		// // do nothing
+		// } else if (runnable instanceof RunnableProducerStage) {
+		// ((RunnableProducerStage) runnable).triggerInitializingSignal();
+		// ((RunnableProducerStage) runnable).triggerStartingSignal();
+		// } else {
+		// // TODO
+		// }
 
-		return runnable;
+		// stage.onSignal(signal, inputPort);
 	}
 }
diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java
index 340acbc4..10381022 100644
--- a/src/main/java/teetime/framework/ThreadService.java
+++ b/src/main/java/teetime/framework/ThreadService.java
@@ -1,6 +1,7 @@
 package teetime.framework;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -27,9 +28,9 @@ class ThreadService extends AbstractService<ThreadService> {
 	private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
 
 	private final SignalingCounter runnableCounter = new SignalingCounter();
-	private final Configuration configuration;
+	private final Set<Stage> threadableStages = new HashSet<Stage>();
 
-	private Set<Stage> threadableStages;
+	private final Configuration configuration;
 
 	public ThreadService(final Configuration configuration) {
 		this.configuration = configuration;
@@ -43,20 +44,34 @@ class ThreadService extends AbstractService<ThreadService> {
 		onStart();
 	}
 
+	void startStageAtRuntime(final Stage newStage) {
+		Set<Stage> newThreadableStages = initialize(newStage);
+
+		startThreads(newThreadableStages);
+
+		sendInitializingSignal(newThreadableStages);
+
+		sendStartingSignal(newThreadableStages);
+	}
+
 	// extracted for runtime use
-	void initialize(final Stage startStage) {
+	private Set<Stage> initialize(final Stage startStage) {
 		if (startStage == null) {
 			throw new IllegalStateException("The start stage may not be null.");
 		}
 
-		// TODO visit(port) only
 		// TODO use decorator pattern to combine all analyzes so that only one traverser pass is necessary
-		IPortVisitor portVisitor = new A0UnconnectedPort();
+		A0UnconnectedPort portVisitor = new A0UnconnectedPort();
+		Traverser traversor = new Traverser(portVisitor, Direction.BOTH);
+		traversor.traverse(startStage);
+
 		A1ThreadableStageCollector stageCollector = new A1ThreadableStageCollector();
-		Traverser traversor = new Traverser(portVisitor, stageCollector, Direction.BOTH);
+		traversor = new Traverser(stageCollector, Direction.BOTH);
 		traversor.traverse(startStage);
 
-		threadableStages.addAll(stageCollector.getThreadableStages());
+		Set<Stage> newThreadableStages = stageCollector.getThreadableStages();
+
+		threadableStages.addAll(newThreadableStages);
 		if (threadableStages.isEmpty()) {
 			throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
 		}
@@ -64,7 +79,7 @@ class ThreadService extends AbstractService<ThreadService> {
 		A2InvalidThreadAssignmentCheck checker = new A2InvalidThreadAssignmentCheck(threadableStages);
 		checker.check();
 
-		IPipeVisitor pipeVisitor = new A3PipeInstantiation();
+		A3PipeInstantiation pipeVisitor = new A3PipeInstantiation();
 		traversor = new Traverser(pipeVisitor, Direction.BOTH);
 		traversor.traverse(startStage);
 
@@ -74,6 +89,8 @@ class ThreadService extends AbstractService<ThreadService> {
 		for (Stage stage : threadableStages) {
 			categorizeThreadableStage(stage);
 		}
+
+		return newThreadableStages;
 	}
 
 	private void categorizeThreadableStage(final Stage stage) {
@@ -93,18 +110,34 @@ class ThreadService extends AbstractService<ThreadService> {
 		}
 	}
 
+	private void startThreads(final Set<Stage> threadableStages) {
+		for (Stage stage : threadableStages) {
+			stage.getOwningThread().start();
+		}
+	}
+
+	private void sendInitializingSignal(final Set<Stage> threadableStages) {
+		for (Stage stage : threadableStages) {
+			((TeeTimeThread) stage.getOwningThread()).sendInitializingSignal();
+		}
+	}
+
+	private void sendStartingSignal(final Set<Stage> newThreadableStages) {
+		for (Stage stage : newThreadableStages) {
+			((TeeTimeThread) stage.getOwningThread()).sendStartingSignal();
+		}
+	}
+
 	@Override
 	void onStart() {
-		startThreads(this.consumerThreads);
-		startThreads(this.finiteProducerThreads);
-		startThreads(this.infiniteProducerThreads);
+		startThreads(threadableStages);
 
-		sendInitializingSignal();
+		sendInitializingSignal(threadableStages);
 	}
 
 	@Override
 	void onExecute() {
-		sendStartingSignal();
+		sendStartingSignal(threadableStages);
 	}
 
 	@Override
@@ -153,30 +186,6 @@ class ThreadService extends AbstractService<ThreadService> {
 		return exceptions;
 	}
 
-	private void startThreads(final Iterable<Thread> threads) {
-		for (Thread thread : threads) {
-			thread.start();
-		}
-	}
-
-	private void sendInitializingSignal() {
-		for (Thread thread : infiniteProducerThreads) {
-			((TeeTimeThread) thread).sendInitializingSignal();
-		}
-		for (Thread thread : finiteProducerThreads) {
-			((TeeTimeThread) thread).sendInitializingSignal();
-		}
-	}
-
-	private void sendStartingSignal() {
-		for (Thread thread : infiniteProducerThreads) {
-			((TeeTimeThread) thread).sendStartingSignal();
-		}
-		for (Thread thread : finiteProducerThreads) {
-			((TeeTimeThread) thread).sendStartingSignal();
-		}
-	}
-
 	Set<Stage> getThreadableStages() {
 		return threadableStages;
 	}
diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java
index ffd4cde1..f17d0c25 100644
--- a/src/main/java/teetime/framework/Traverser.java
+++ b/src/main/java/teetime/framework/Traverser.java
@@ -18,10 +18,15 @@ package teetime.framework;
 import java.util.HashSet;
 import java.util.Set;
 
-import teetime.framework.IPipeVisitor.VisitorBehavior;
 import teetime.framework.pipe.DummyPipe;
-import teetime.framework.pipe.IPipe;
 
+/**
+ * Traverses the all stages that are <b>reachable</b> from the given <i>stage</i>.
+ * Each stage is visited exactly once (not more, not less).
+ *
+ * @author Christian Wulf
+ *
+ */
 public class Traverser {
 
 	public static enum Direction {
@@ -38,34 +43,32 @@ public class Traverser {
 		}
 	}
 
-	private static final IPortVisitor DEFAULT_PORT_VISITOR = new IPortVisitor() {
-		@Override
-		public void visit(final AbstractPort<?> port) {
-			// do nothing
-		}
-	};
+	public static enum VisitorBehavior {
+		CONTINUE, STOP;
+	}
 
-	private final IPortVisitor portVisitor;
-	private final IPipeVisitor pipeVisitor;
-	private final Direction direction;
 	private final Set<Stage> visitedStages = new HashSet<Stage>();
 
-	public Traverser(final IPipeVisitor pipeVisitor) {
-		this(pipeVisitor, Direction.FORWARD);
-	}
+	private final ITraverserVisitor traverserVisitor;
+	private final Direction direction;
 
-	public Traverser(final IPipeVisitor pipeVisitor, final Direction direction) {
-		this(DEFAULT_PORT_VISITOR, pipeVisitor, direction);
+	public Traverser(final ITraverserVisitor traverserVisitor) {
+		this(traverserVisitor, Direction.FORWARD);
 	}
 
-	public Traverser(final IPortVisitor portVisitor, final IPipeVisitor pipeVisitor, final Direction direction) {
-		this.portVisitor = portVisitor;
-		this.pipeVisitor = pipeVisitor;
+	public Traverser(final ITraverserVisitor traverserVisitor, final Direction direction) {
+		this.traverserVisitor = traverserVisitor;
 		this.direction = direction;
 	}
 
 	public void traverse(final Stage stage) {
-		if (!visitedStages.add(stage) || stage.getCurrentState() != StageState.CREATED) {
+		VisitorBehavior behavior = traverserVisitor.visit(stage);
+		if (behavior == VisitorBehavior.STOP) {
+			return;
+		}
+
+		if (!visitedStages.add(stage)) {
+			// || stage.getCurrentState() != StageState.CREATED
 			// do not visit (1) an already visited stage and (2) a stage that currently run (runtime visiting)
 			return;
 		}
@@ -84,10 +87,13 @@ public class Traverser {
 	}
 
 	private void visitAndTraverse(final AbstractPort<?> port, final Direction direction) {
-		portVisitor.visit(port);
-		IPipe<?> pipe = port.getPipe();
-		if (pipe != DummyPipe.INSTANCE && pipeVisitor.visit(pipe) == VisitorBehavior.CONTINUE) {
-			AbstractPort<?> nextPort = (direction == Direction.FORWARD) ? pipe.getTargetPort() : pipe.getSourcePort();
+		if (port.getPipe() instanceof DummyPipe) {
+			return;
+		}
+		VisitorBehavior behavior = traverserVisitor.visit(port);
+
+		if (behavior == VisitorBehavior.CONTINUE) {
+			AbstractPort<?> nextPort = (direction == Direction.FORWARD) ? port.getPipe().getTargetPort() : port.getPipe().getSourcePort();
 			traverse(nextPort.getOwningStage()); // recursive call
 		}
 	}
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index a41f9233..a74d68dd 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -25,9 +25,9 @@ import teetime.framework.signal.ISignal;
  * @author Christian Wulf
  *
  */
-public final class DummyPipe<T> implements IPipe<T> {
+public final class DummyPipe implements IPipe<Object> {
 
-	public static final IPipe<?> INSTANCE = new DummyPipe<Object>();
+	public static final IPipe<?> INSTANCE = new DummyPipe();
 
 	private DummyPipe() {
 		// singleton
@@ -59,12 +59,12 @@ public final class DummyPipe<T> implements IPipe<T> {
 	}
 
 	@Override
-	public OutputPort<? extends T> getSourcePort() {
+	public OutputPort<? extends Object> getSourcePort() {
 		return null;
 	}
 
 	@Override
-	public InputPort<T> getTargetPort() {
+	public InputPort<Object> getTargetPort() {
 		return null;
 	}
 
diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
index 4f76d85b..bc6faa47 100644
--- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
+++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java
@@ -18,7 +18,7 @@ package teetime.stage.basic.distributor.dynamic;
 import java.util.ArrayList;
 import java.util.List;
 
-import teetime.framework.DynamicActuator;
+import teetime.framework.RuntimeServiceFacade;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.pipe.SpScPipeFactory;
@@ -29,7 +29,6 @@ import teetime.util.framework.port.PortAction;
 public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
 
 	private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
-	private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator();
 
 	private final InputPort<T> inputPort;
 
@@ -50,7 +49,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
 	private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) {
 		INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
 
-		DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
+		RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
 
 		newOutputPort.sendSignal(new InitializingSignal());
 		newOutputPort.sendSignal(new StartingSignal());
diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java
index e8272835..27878602 100644
--- a/src/test/java/teetime/framework/TraverserTest.java
+++ b/src/test/java/teetime/framework/TraverserTest.java
@@ -37,12 +37,12 @@ import teetime.stage.util.CountingMap;
 
 public class TraverserTest {
 
-	private final Traverser traversor = new Traverser(new IntraStageCollector());
-
 	@Test
 	public void traverse() {
 		TestConfiguration tc = new TestConfiguration();
 		new Execution<TestConfiguration>(tc);
+
+		Traverser traversor = new Traverser(new IntraStageCollector(tc.init));
 		traversor.traverse(tc.init);
 
 		Set<Stage> comparingStages = new HashSet<Stage>();
@@ -56,9 +56,8 @@ public class TraverserTest {
 	}
 
 	// WordCounterConfiguration
-	private class TestConfiguration extends Configuration {
+	private static class TestConfiguration extends Configuration {
 
-		public final CountingMapMerger<String> result = new CountingMapMerger<String>();
 		public final InitialElementProducer<File> init;
 		public final File2SeqOfWords f2b;
 		public Distributor<String> distributor;
@@ -68,6 +67,7 @@ public class TraverserTest {
 			init = new InitialElementProducer<File>(new File(""));
 			f2b = new File2SeqOfWords("UTF-8", 512);
 			distributor = new Distributor<String>(new RoundRobinStrategy2());
+			CountingMapMerger<String> result = new CountingMapMerger<String>();
 
 			// last part
 			final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
index 8abc8a9c..d175ae21 100644
--- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
+++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java
@@ -26,7 +26,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import teetime.framework.Configuration;
-import teetime.framework.DynamicActuator;
+import teetime.framework.RuntimeServiceFacade;
 import teetime.framework.Execution;
 import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
 import teetime.stage.CollectorSink;
@@ -37,8 +37,6 @@ import teetime.util.framework.port.PortAction;
 @Ignore
 public class DynamicMergerTest {
 
-	private static final DynamicActuator DYNAMIC_ACTUATOR = new DynamicActuator();
-
 	@Test
 	public void shouldWorkWithoutActionTriggers() throws Exception {
 		List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
@@ -103,7 +101,7 @@ public class DynamicMergerTest {
 			@Override
 			public void execute(final DynamicMerger<Integer> dynamicDistributor) {
 				super.execute(dynamicDistributor);
-				DYNAMIC_ACTUATOR.startWithinNewThread(dynamicDistributor, initialElementProducer);
+				RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, initialElementProducer);
 			}
 		};
 		return portAction;
diff --git a/src/test/resources/data/output.txt b/src/test/resources/data/output.txt
index e69de29b..5c016264 100644
--- a/src/test/resources/data/output.txt
+++ b/src/test/resources/data/output.txt
@@ -0,0 +1,11 @@
+Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus.
+Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo
\ No newline at end of file
-- 
GitLab