diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index ab8a4be910f9e580b59f809a8232dff499113356..3eff9cfc26a607fb29555c9622dbb9a5a9042323 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -87,15 +87,6 @@ public abstract class AbstractStage implements StageWithPort {
 		this.parentStage = parentStage;
 	}
 
-	@Override
-	public boolean isReschedulable() {
-		return this.reschedulable;
-	}
-
-	public void setReschedulable(final boolean reschedulable) {
-		this.reschedulable = reschedulable;
-	}
-
 	@Override
 	public String getId() {
 		return this.id;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java
index 234a1c598e14e83a5af80213ccbf26a46cdc0f86..ee22541cb63b3309d3d04b96d7b8a466fc506d59 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java
@@ -21,17 +21,17 @@ public class Analysis {
 	}
 
 	public void init() {
-		for (StageWithPort stage : this.configuration.getConsumerStages()) {
+		for (HeadStage stage : this.configuration.getConsumerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.consumerThreads.add(thread);
 		}
 
-		for (StageWithPort stage : this.configuration.getFiniteProducerStages()) {
+		for (HeadStage stage : this.configuration.getFiniteProducerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.finiteProducerThreads.add(thread);
 		}
 
-		for (StageWithPort stage : this.configuration.getInfiniteProducerStages()) {
+		for (HeadStage stage : this.configuration.getInfiniteProducerStages()) {
 			Thread thread = new Thread(new RunnableStage(stage));
 			this.infiniteProducerThreads.add(thread);
 		}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java
index fcac06f4dc51f84c16afa9fd64f3923d131012cc..97667505787dbd220f793f3dcf18d104fcb46bab 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java
@@ -5,19 +5,19 @@ import java.util.List;
 
 public class Configuration {
 
-	private final List<StageWithPort> consumerStages = new LinkedList<StageWithPort>();
-	private final List<StageWithPort> finiteProducerStages = new LinkedList<StageWithPort>();
-	private final List<StageWithPort> infiniteProducerStages = new LinkedList<StageWithPort>();
+	private final List<HeadStage> consumerStages = new LinkedList<HeadStage>();
+	private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>();
+	private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>();
 
-	public List<StageWithPort> getConsumerStages() {
+	public List<HeadStage> getConsumerStages() {
 		return this.consumerStages;
 	}
 
-	public List<StageWithPort> getFiniteProducerStages() {
+	public List<HeadStage> getFiniteProducerStages() {
 		return this.finiteProducerStages;
 	}
 
-	public List<StageWithPort> getInfiniteProducerStages() {
+	public List<HeadStage> getInfiniteProducerStages() {
 		return this.infiniteProducerStages;
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
index f64c50f3fc485409f628533876626fc9e618002d..36c4893b3f937be74bd2553a13bc730607e447a6 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java
@@ -12,9 +12,6 @@ public abstract class ConsumerStage<I> extends AbstractStage {
 	public void executeWithPorts() {
 		I element = this.getInputPort().receive();
 
-		boolean isReschedulable = this.determineReschedulability();
-		this.setReschedulable(isReschedulable);
-
 		this.execute(element);
 	}
 
@@ -23,10 +20,6 @@ public abstract class ConsumerStage<I> extends AbstractStage {
 		// do nothing
 	}
 
-	protected boolean determineReschedulability() {
-		return this.getInputPort().getPipe().size() > 0;
-	}
-
 	protected abstract void execute(I element);
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java
new file mode 100644
index 0000000000000000000000000000000000000000..f4e3e8678bcff46771ae511083a2fdf8a9682ebe
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java
@@ -0,0 +1,14 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends Pipeline<FirstStage, LastStage> implements HeadStage {
+
+	@Override
+	public boolean shouldBeTerminated() {
+		return this.firstStage.shouldBeTerminated();
+	}
+
+	@Override
+	public void terminate() {
+		this.firstStage.terminate();
+	}
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java
new file mode 100644
index 0000000000000000000000000000000000000000..2c80f08c4adfeb83b27ce5eb094a98e663d614d2
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java
@@ -0,0 +1,8 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+public interface HeadStage extends StageWithPort {
+
+	boolean shouldBeTerminated();
+
+	void terminate();
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index 3ce89029e00b078f8b964ead6bb7ff2975da6b60..5d06dd476525b392cda71d173c874ea1c8f770aa 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -4,15 +4,6 @@ import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
 
 public final class OutputPort<T> extends AbstractPort<T> {
 
-	/**
-	 * Performance cache: Avoids the following method chain
-	 *
-	 * <pre>
-	 * this.getPipe().getTargetPort().getOwningStage()
-	 * </pre>
-	 */
-	// private StageWithPort cachedTargetStage;
-
 	OutputPort() {
 		super();
 	}
@@ -26,15 +17,6 @@ public final class OutputPort<T> extends AbstractPort<T> {
 		return this.pipe.add(element);
 	}
 
-	// public StageWithPort getCachedTargetStage() {
-	// return this.cachedTargetStage;
-	// }
-
-	@Deprecated
-	public void setCachedTargetStage(final StageWithPort cachedTargetStage) {
-		// this.cachedTargetStage = cachedTargetStage;
-	}
-
 	public void sendSignal(final Signal signal) {
 		this.pipe.setSignal(signal);
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
index 018f39e01b56fe302a927dab29cd9e2f89d3356f..6d063ea831650c80b18e2895382cc1b9200013ab 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
@@ -1,150 +1,54 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
-import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 
 import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
-import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal;
 import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection;
 
-import kieker.common.logging.Log;
-import kieker.common.logging.LogFactory;
-
-/**
- *
- * @author Christian Wulf
- *
- * @param <FirstStage>
- * @param <LastStage>
- */
-// BETTER remove the pipeline since it does not add any new functionality
 public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort {
 
-	private final String id;
-	/**
-	 * A unique logger instance per stage instance
-	 */
-	protected Log logger;
+	protected FirstStage firstStage;
+	protected LastStage lastStage;
 
-	private FirstStage firstStage;
-	private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
-	private LastStage lastStage;
-
-	private StageWithPort parentStage;
-
-	// private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>();
-
-	public Pipeline() {
-		this(UUID.randomUUID().toString());
-	}
-
-	public Pipeline(final String id) {
-		this.id = id; // the id should only be represented by a UUID, not additionally by the class name
-		this.logger = LogFactory.getLog(this.id);
-	}
-
-	@Override
-	public String getId() {
-		return this.id;
-	}
-
-	public void setFirstStage(final FirstStage stage) {
-		this.firstStage = stage;
+	public FirstStage getFirstStage() {
+		return this.firstStage;
 	}
 
-	public void addIntermediateStages(final StageWithPort... stages) {
-		this.intermediateStages.addAll(Arrays.asList(stages));
+	public void setFirstStage(final FirstStage firstStage) {
+		this.firstStage = firstStage;
 	}
 
-	public void addIntermediateStage(final StageWithPort stage) {
-		this.intermediateStages.add(stage);
+	public LastStage getLastStage() {
+		return this.lastStage;
 	}
 
-	public void setLastStage(final LastStage stage) {
-		this.lastStage = stage;
+	public void setLastStage(final LastStage lastStage) {
+		this.lastStage = lastStage;
 	}
 
 	@Override
-	public void executeWithPorts() {
-		StageWithPort headStage = this.firstStage;
-
-		// do {
-		headStage.executeWithPorts();
-		// } while (headStage.isReschedulable());
-
-		// headStage.sendFinishedSignalToAllSuccessorStages();
-
-		// this.updateRescheduable(headStage);
-
-		// this.setReschedulable(headStage.isReschedulable());
+	public String getId() {
+		return this.firstStage.getId();
 	}
 
-	// private final void updateRescheduable(final StageWithPort<?, ?> stage) {
-	// StageWithPort<?, ?> currentStage = stage;
-	// do {
-	// this.firstStageIndex++;
-	// // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
-	// // if (currentStage == null) { // loop reaches the last stage
-	// if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
-	// this.setReschedulable(false);
-	// this.cleanUp();
-	// return;
-	// }
-	// currentStage = this.stages[this.firstStageIndex];
-	// currentStage.onIsPipelineHead();
-	// } while (!currentStage.isReschedulable());
-	//
-	// this.setReschedulable(true);
-	// }
-
 	@Override
-	public void onIsPipelineHead() {
-		// do nothing
-	}
-
-	@Deprecated
-	public void onStarting() {
-		int size = 1 + this.intermediateStages.size() + 1;
-		StageWithPort[] stages = new StageWithPort[size];
-		stages[0] = this.firstStage;
-		for (int i = 0; i < this.intermediateStages.size(); i++) {
-			StageWithPort stage = this.intermediateStages.get(i);
-			stages[1 + i] = stage;
-		}
-		stages[stages.length - 1] = this.lastStage;
-
-		// for (int i = 0; i < this.stages.length; i++) {
-		// StageWithPort<?, ?> stage = this.stages[i];
-		// stage.setParentStage(this, i);
-		// stage.setListener(this);
-		// }
-
-		// for (int i = 0; i < this.stages.length - 1; i++) {
-		// StageWithPort stage = this.stages[i];
-		// stage.setSuccessor(this.stages[i + 1]);
-		// }
-		// this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
-
-		for (StageWithPort stage : stages) {
-			stage.onSignal(new StartingSignal(), null);
-		}
+	public void executeWithPorts() {
+		this.firstStage.executeWithPorts();
 	}
 
 	@Override
 	public StageWithPort getParentStage() {
-		return this.parentStage;
+		return this.firstStage.getParentStage();
 	}
 
 	@Override
 	public void setParentStage(final StageWithPort parentStage, final int index) {
-		this.parentStage = parentStage;
+		this.firstStage.setParentStage(parentStage, index);
 	}
 
 	@Override
-	public boolean isReschedulable() {
-		return this.firstStage.isReschedulable();
+	public void onIsPipelineHead() {
+		this.firstStage.onIsPipelineHead();
 	}
 
 	@Override
@@ -152,17 +56,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
 		this.firstStage.onSignal(signal, inputPort);
 	}
 
-	public FirstStage getFirstStage() {
-		return this.firstStage;
-	}
-
-	public LastStage getLastStage() {
-		return this.lastStage;
-	}
-
 	@Override
 	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
-		// do nothing
+		this.lastStage.validateOutputPorts(invalidPortConnections);
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
index 5a9492154d93e0fca7837199b676bddcf783b0ae..37f95facd4b39076b68d2102810cbce0b695cf02 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java
@@ -13,18 +13,15 @@ package teetime.variant.methodcallWithPorts.framework.core;
  *            the type of the default output port
  *
  */
-public abstract class ProducerStage<O> extends AbstractStage {
+public abstract class ProducerStage<O> extends AbstractStage implements HeadStage {
 
 	protected final OutputPort<O> outputPort = this.createOutputPort();
+	private boolean shouldTerminate;
 
 	public final OutputPort<O> getOutputPort() {
 		return this.outputPort;
 	}
 
-	public ProducerStage() {
-		this.setReschedulable(true);
-	}
-
 	@Override
 	public void executeWithPorts() {
 		this.execute();
@@ -35,6 +32,16 @@ public abstract class ProducerStage<O> extends AbstractStage {
 		// do nothing
 	}
 
+	@Override
+	public void terminate() {
+		this.shouldTerminate = true;
+	}
+
+	@Override
+	public boolean shouldBeTerminated() {
+		return this.shouldTerminate;
+	}
+
 	protected abstract void execute();
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
index 3c61059f8366553c2df19e735d307214b354e169..8298ebaebad317741cf53308e5fd22bbd8bfedc8 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
@@ -10,11 +10,11 @@ import teetime.variant.methodcallWithPorts.framework.core.validation.AnalysisNot
 
 public class RunnableStage implements Runnable {
 
-	private final StageWithPort stage;
+	private final HeadStage stage;
 	private final Logger logger;
 	private boolean validationEnabled;
 
-	public RunnableStage(final StageWithPort stage) {
+	public RunnableStage(final HeadStage stage) {
 		this.stage = stage;
 		this.logger = LoggerFactory.getLogger(stage.getClass());
 	}
@@ -37,7 +37,7 @@ public class RunnableStage implements Runnable {
 
 			do {
 				this.stage.executeWithPorts();
-			} while (this.stage.isReschedulable());
+			} while (!this.stage.shouldBeTerminated());
 
 			TerminatingSignal terminatingSignal = new TerminatingSignal();
 			this.stage.onSignal(terminatingSignal, null);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java
index 9753bcdf0bc4819e798317e3bbae88abb150fb82..ffd5d0600f4189aee587535978858b6e2c7c3349 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java
@@ -15,14 +15,6 @@ public interface StageWithPort {
 
 	void setParentStage(StageWithPort parentStage, int index);
 
-	// void setListener(OnDisableListener listener);
-
-	/**
-	 * @return <code>true</code> iff this stage makes progress when it is re-executed by the scheduler, otherwise <code>false</code>.<br>
-	 *         For example, many stages are re-schedulable if at least one of their input ports are not empty.
-	 */
-	boolean isReschedulable();
-
 	// BETTER remove this method since it will be replaced by onTerminating()
 	void onIsPipelineHead();
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
index e914afef5501ec8223e9aeddda7dcd88ef90fe76..5d7f3f511e6e3c69e5132028771fc5362d650564 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
@@ -1,6 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 
 public abstract class AbstractPipe<T> implements IPipe<T> {
@@ -27,4 +28,10 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
 		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
+	@Override
+	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		sourcePort.setPipe(this);
+		targetPort.setPipe(this);
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index 270556dc16a2f816963d8d78510a833dad94c4f1..7a6b4aa0019fb32fc0b2acce7fc0e71a7df9a9dc 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -24,13 +24,6 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		sourcePort.setPipe(this);
-		targetPort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-	}
-
 	@Override
 	public boolean add(final T element) {
 		this.elements.put(this.tail++, element);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
index 4c22e36cf97f8a4b6d0c9cace4bcebfecd6675b6..8bf61cecfd561019d6a67939c4f7f6f134f96486 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
@@ -23,13 +23,6 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		sourcePort.setPipe(this);
-		targetPort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-	}
-
 	@Override
 	public boolean add(final T element) {
 		return this.elements.offer(element);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
index eefd95f7462f9af1e96ed007d4d83f751d29b5e4..2dd45c930656c604aa28426dbf55b33a3211d10c 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
@@ -14,13 +14,6 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		sourcePort.setPipe(this);
-		targetPort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-	}
-
 	/*
 	 * (non-Javadoc)
 	 *
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index aebe4b589decebb337e83567caf4740f9f30c174..96eeba861d7374fc668a0f76984156a29c23bedd 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -13,13 +13,6 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		sourcePort.setPipe(this);
-		targetPort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-	}
-
 	@Override
 	public boolean add(final T element) {
 		this.element = element;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 22b5c6a41ee5a25916f4e3c5644a9b134b738745..3e6f46941f00927ea63eedde5c66354495ec6dd4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -31,13 +31,6 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 		return pipe;
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		targetPort.setPipe(this);
-		sourcePort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
-	}
-
 	@Override
 	public boolean add(final T element) {
 		// BETTER introduce a QueueIsFullStrategy
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index 71cc7e87fb9b436ebb46fc7b2276408d9638d2c6..3befc87d87b8b09f6a107de743eb9f5c3915e171 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -27,7 +27,6 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		sourcePort.setPipe(this);
 		targetPort.setPipe(this);
-		sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
index 1520b6097a364afe9086be53e49cce37d84e12a8..e140fed534db7c90f00c1e970176a57340f2d6ca 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java
@@ -26,7 +26,7 @@ public class Clock extends ProducerStage<Long> {
 		try {
 			Thread.sleep(delayInMs);
 		} catch (InterruptedException e) {
-			this.setReschedulable(false);
+			this.terminate();
 		}
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
index 028d8709239c43e1a6659465d8044af7acc0e093..599d9fb490911009ce4f088a897f9c123bcc4b85 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java
@@ -20,7 +20,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
 public class ObjectProducer<T> extends ProducerStage<T> {
@@ -60,13 +60,12 @@ public class ObjectProducer<T> extends ProducerStage<T> {
 		newObject = this.inputObjectCreator.create();
 		this.numInputObjects--;
 
-		if (this.numInputObjects == 0) {
-			this.setReschedulable(false);
-			// this.getOutputPort().pipe.close();
-		}
-
 		// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
 		this.send(this.outputPort, newObject);
+
+		if (this.numInputObjects == 0) {
+			this.terminate();
+		}
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
index d0e281fdceb2469876c8857da0f77392c434c9a1..d31041fd342afe845c28c0f5d72d010b38eefbd1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
@@ -16,8 +16,7 @@ public class Relay<T> extends ProducerStage<T> {
 		T element = this.inputPort.receive();
 		if (null == element) {
 			if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) {
-				this.setReschedulable(false);
-				assert 0 == this.inputPort.getPipe().size();
+				this.terminate();
 			}
 			Thread.yield();
 			return;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java
index 5b234be8a879ef539f43f0eda71bfe7793e36eae..55ee676c01c7986604fbd14af439608c4c24590b 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java
@@ -24,15 +24,13 @@ public class Delay<T> extends AbstractStage {
 			T element = this.inputPort.receive();
 			this.send(this.outputPort, element);
 		}
-
-		// this.setReschedulable(this.getInputPort().pipe.size() > 0);
-		this.setReschedulable(false);
-		// System.out.println("delay: " + this.getInputPort().pipe.size());
 	}
 
 	@Override
 	public void onIsPipelineHead() {
-		this.setReschedulable(true);
+		while (!this.inputPort.getPipe().isEmpty()) {
+			this.executeWithPorts();
+		}
 	}
 
 	public InputPort<T> getInputPort() {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
index 7f469d4f856faa5b953ee9539e3ee35a9150a372..bf90f0a521cb473445f6f1ea8c8603c40ee889eb 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
@@ -48,15 +48,6 @@ public class Merger<T> extends AbstractStage {
 		}
 
 		this.send(this.outputPort, token);
-
-		boolean isReschedulable = false;
-		for (InputPort<?> inputPort : this.getInputPorts()) {
-			if (!inputPort.getPipe().isEmpty()) {
-				isReschedulable = true;
-				break;
-			}
-		}
-		this.setReschedulable(isReschedulable);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java
index a49946d3938e3ab04a99da48dad76c3833990e29..ecbef3d56a2c59f024197bceeabcdcbc5936effa 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java
@@ -87,7 +87,7 @@ public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> {
 				}
 			}
 
-			this.setReschedulable(false);
+			this.terminate();
 		}
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
index decfc7f31cd057d5e60d96f95754e83706e5f5c5..3e09b86235c97e6db5959dda0c2b8670f51dbe7f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java
@@ -165,7 +165,7 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
 				}
 			}
 
-			this.setReschedulable(false);
+			this.terminate();
 			this.tcpStringReader.terminate();
 		}
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java
index d0735a73daa42322e5924ae6d8e024b2df61e2c1..d6759a263a4163b48e4287fe73f8dfe3c9e43b14 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java
@@ -20,6 +20,10 @@ import java.io.File;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.stage.FileExtensionSwitch;
 import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger;
@@ -36,11 +40,12 @@ import kieker.common.util.filesystem.FSUtil;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
 public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> {
 
+	private final PipeFactory pipeFactory = new PipeFactory();
 	private ClassNameRegistryRepository classNameRegistryRepository;
 
 	/**
@@ -68,8 +73,11 @@ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter,
 		final OutputPort<File> zipFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.ZIP_FILE_EXTENSION);
 
 		// connect ports by pipes
-		SingleElementPipe.connect(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort());
-		SingleElementPipe.connect(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort());
+		IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
+		pipe.connectPorts(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort());
+
+		pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
+		pipe.connectPorts(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort());
 
 		SingleElementPipe.connect(normalFileOutputPort, datFile2RecordFilter.getInputPort());
 		SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort());
@@ -81,11 +89,6 @@ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter,
 
 		// prepare pipeline
 		this.setFirstStage(classNameRegistryCreationFilter);
-		this.addIntermediateStage(directory2FilesFilter);
-		this.addIntermediateStage(fileExtensionSwitch);
-		this.addIntermediateStage(datFile2RecordFilter);
-		this.addIntermediateStage(binaryFile2RecordFilter);
-		this.addIntermediateStage(zipFile2RecordFilter);
 		this.setLastStage(recordMerger);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java
index c305843472faa90cadfd2566a5ae97686f5a0204..86e015aa2b8c5dab5ce0c78dc3e8c81f5d4cbd16 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java
@@ -24,7 +24,6 @@ public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationF
 		final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository);
 
 		this.setFirstStage(classNameRegistryCreationFilter);
-		this.addIntermediateStage(directory2FilesFilter);
 		this.setLastStage(binaryFile2RecordFilter);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java
index cfa54ecfaaa32632850df48b36a134db415812ac..d636a07df6e9ca2cbb54d2d0e8d4d0a91e314181 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java
@@ -24,7 +24,6 @@ public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationF
 		final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository);
 
 		this.setFirstStage(classNameRegistryCreationFilter);
-		this.addIntermediateStage(directory2FilesFilter);
 		this.setLastStage(datFile2RecordFilter);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java
index af0c71fad9d84bf4ea78c4ea8093c30cd83432ad..4bd7ef0a7cab4dad716f381d8123a76ac31e0d42 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java
@@ -144,7 +144,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
 				}
 			}
 
-			this.setReschedulable(false);
+			this.terminate();
 		}
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
index 749548bba8f697f013b88b2b80774347bd472437..704ceffa8804a32b570a066e44a6453afb074bb2 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
@@ -29,7 +29,7 @@ import kieker.common.record.IMonitoringRecord;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
 public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> {
diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java
index e30aa0674a73d2d37d7f926ce37e3fca69c5a7ac..659a9cf27517c840cacf57afdfc4894cab53c2fc 100644
--- a/src/main/java/util/KiekerLoadDriver.java
+++ b/src/main/java/util/KiekerLoadDriver.java
@@ -14,9 +14,9 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
-import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
@@ -36,17 +36,17 @@ public class KiekerLoadDriver {
 	private long[] timings;
 
 	public KiekerLoadDriver(final File directory) {
-		StageWithPort producerPipeline = this.buildProducerPipeline(directory);
+		HeadStage producerPipeline = this.buildProducerPipeline(directory);
 		this.runnableStage = new RunnableStage(producerPipeline);
 	}
 
-	private StageWithPort buildProducerPipeline(final File directory) {
+	private HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) {
 		ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
 		// create stages
 		Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
 		CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
 
-		final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
+		final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
 		pipeline.setFirstStage(dir2RecordsFilter);
 		pipeline.setLastStage(collector);
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
index 6b2a9378342364001ca044dc632f18e7b42d201a..bd2ae16ec401b7ad0956e7388b7e0b33a9d1601a 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
@@ -20,7 +20,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe;
@@ -66,7 +66,7 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
 
-		final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java
index 19dc2d6e76534bfc5e661654826f20938cf1fc70..bd08ab4adb42720cb349f999d22a497e6345573d 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java
@@ -20,7 +20,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
@@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
 
-		final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
index 2d57a02c6c1eb479daef864faf0cedf3954a8f9a..ce10958f8b0e7ae222be61f1f7f1a005c88b13db 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java
@@ -20,7 +20,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
@@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
 
-		final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		// pipeline.addIntermediateStage(relayFake);
 		pipeline.addIntermediateStage(startTimestampFilter);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
index 8f50cbe13880243478686ba947f21d75637b5448..1cd3f96376c31bffcd013971f60923ada4207f0d 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
@@ -20,7 +20,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
@@ -68,7 +68,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
 
-		final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
index 4fb5db966f945e91348ab691fe53662eb875546a..702c3cc443e84a932f090abade6e335dbb79766d 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -20,7 +20,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
@@ -71,7 +71,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
 		this.clock.setInitialDelayInMs(100);
 		this.clock.setIntervalDelayInMs(100);
 
-		final Pipeline<Clock, Sink<Long>> pipeline = new Pipeline<Clock, Sink<Long>>();
+		final HeadPipeline<Clock, Sink<Long>> pipeline = new HeadPipeline<Clock, Sink<Long>>();
 		pipeline.setFirstStage(this.clock);
 		pipeline.setLastStage(new Sink<Long>());
 
@@ -95,7 +95,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
 
-		final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
index f204497b5c2bac8bea7baf36936bd73994114ba9..ba7cd4357fa52fe3f4d58484a4e038688798cbcf 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
@@ -22,7 +22,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
@@ -59,7 +59,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
+		HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
 		this.producerThread = new Thread(new RunnableStage(producerPipeline));
 
@@ -70,17 +70,17 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
 			List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
 			this.timestampObjectsList.add(resultList);
 
-			Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
+			HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
 			this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline));
 		}
 	}
 
-	private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
+	private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
 			final ConstructorClosure<TimestampObject> inputObjectCreator) {
 		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
 		Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
 
-		final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(distributor);
 
@@ -93,8 +93,8 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
 	 * @param numNoopFilters
 	 * @since 1.10
 	 */
-	private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
-			final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
+	private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
+			final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
 			final List<TimestampObject> timestampObjects) {
 		Relay<TimestampObject> relay = new Relay<TimestampObject>();
 		@SuppressWarnings("unchecked")
@@ -107,7 +107,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
-		final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
index 9a4c84ae7838a29932e653b9ad3f234d33cdb8d8..a5ac5b398c84d16936c3cb10118338cb820b6f43 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
@@ -22,7 +22,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
@@ -113,7 +113,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		Sink<TimestampObject> sink = new Sink<TimestampObject>();
 		Sink<Void> endStage = new Sink<Void>();
 
-		final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		// pipeline.setFirstStage(sink);
 		// pipeline.setFirstStage(endStage);
@@ -147,7 +147,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
-		final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
index cb95597bfcce10321cf747c949515ea619a19952..5307b5bf0ed220d4942a2f6883269ff8cd8d2c1e 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
@@ -22,7 +22,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
@@ -60,7 +60,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
+		HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
 		this.producerThread = new Thread(new RunnableStage(producerPipeline));
 
@@ -76,12 +76,12 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
 		}
 	}
 
-	private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
+	private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
 			final ConstructorClosure<TimestampObject> inputObjectCreator) {
 		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
 		Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
 
-		final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(distributor);
 
@@ -94,8 +94,8 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
 	 * @param numNoopFilters
 	 * @since 1.10
 	 */
-	private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
-			final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
+	private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
+			final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
 			final List<TimestampObject> timestampObjects) {
 		Relay<TimestampObject> relay = new Relay<TimestampObject>();
 		@SuppressWarnings("unchecked")
@@ -108,7 +108,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
-		final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
index f31ad97eb15ebb506ba6fc07b99fa2c359227ab9..9d16a88b5707c12a8d26efaf478606111e29898f 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
@@ -22,7 +22,7 @@ import java.util.List;
 import teetime.util.ConstructorClosure;
 import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
@@ -59,7 +59,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
+		HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
 		this.producerThread = new Thread(new RunnableStage(producerPipeline));
 
@@ -76,12 +76,12 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
 
 	}
 
-	private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
+	private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
 			final ConstructorClosure<TimestampObject> inputObjectCreator) {
 		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
 		Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
 
-		final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
+		final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(distributor);
 
@@ -102,7 +102,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
-		final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
+		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(startTimestampFilter);
 		pipeline.addIntermediateStages(noopFilters);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java
index 4e141ed0dd982eef61179f39dcf432c034fc3632..91a6a5aeda5076e4f317d2fab82f5ed9056afece 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java
@@ -1,7 +1,7 @@
 package teetime.variant.methodcallWithPorts.examples.kiekerdays;
 
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -41,7 +41,7 @@ public class TcpTraceLoggingExplorviz extends Analysis {
 		SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>();
+		HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(endStage);
 		return tcpReader;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
index b6c76cd83262d09df091e81e06cf97e3753bdb94..538c20c1defd3d8453ac18aa535d77d525611a53 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
@@ -7,7 +7,7 @@ import java.util.List;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -41,7 +41,7 @@ public class TcpTraceReconstruction extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
@@ -53,14 +53,14 @@ public class TcpTraceReconstruction extends Analysis {
 		}
 	}
 
-	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
@@ -83,7 +83,7 @@ public class TcpTraceReconstruction extends Analysis {
 		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
+		HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(instanceOfFilter);
 		pipeline.addIntermediateStage(traceReconstructionFilter);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
index e01c1b521b5bb376b1d65fda743a60f1409227a6..a38bf7d6b00728f847b9ad87f14b9c014f15e8d0 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
@@ -9,7 +9,7 @@ import java.util.TreeMap;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -49,10 +49,10 @@ public class TcpTraceReduction extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000);
+		HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
@@ -64,20 +64,20 @@ public class TcpTraceReduction extends Analysis {
 		}
 	}
 
-	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -86,7 +86,7 @@ public class TcpTraceReduction extends Analysis {
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
+		HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
@@ -113,7 +113,7 @@ public class TcpTraceReduction extends Analysis {
 		SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
+		HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(instanceOfFilter);
 		pipeline.addIntermediateStage(traceReconstructionFilter);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
index a3ccee70df98a3e93fd48cd94266e8948e8cd8d0..6b530cc0070ee07860d060e46afc6f260515f6c7 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java
@@ -20,7 +20,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import teetime.variant.methodcallWithPorts.framework.core.Configuration;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
@@ -58,7 +58,7 @@ public class RecordReaderConfiguration extends Configuration {
 		Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
 		CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
 
-		final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
+		final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
 		pipeline.setFirstStage(dir2RecordsFilter);
 		pipeline.setLastStage(collector);
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
index dd178e770462daf6b0ce336ab142f005b67b65e6..2d459fd6d40c3f587b06f7e04f7903b6ffd807b6 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java
@@ -3,7 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReading;
 import java.util.List;
 
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -25,7 +25,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
 	private Counter<IMonitoringRecord> recordCounter;
 	private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
 
-	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clockStage = new Clock();
 		clockStage.setInitialDelayInMs(intervalDelayInMs);
 		clockStage.setIntervalDelayInMs(intervalDelayInMs);
@@ -34,7 +34,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
 		SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
+		HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clockStage);
 		pipeline.setLastStage(distributor);
 		return pipeline;
@@ -54,7 +54,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
 		SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>();
+		HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.addIntermediateStage(this.recordCounter);
 		// pipeline.addIntermediateStage(this.recordThroughputStage);
@@ -66,7 +66,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
 	public void init() {
 		super.init();
 
-		Pipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
+		HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockPipeline));
 
 		StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
index 58c3fc74041d58281dccc9ea2ed29013b2362ab6..6ba339ac436b5f1ffb984e5bcc0de192c4357ffd 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
@@ -6,9 +6,8 @@ import java.util.List;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
-import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Clock;
@@ -45,17 +44,17 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
+		HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
 		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
-		StageWithPort pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
+		HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
 		this.workerThread = new Thread(new RunnableStage(pipeline));
 	}
 
-	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setIntervalDelayInMs(intervalDelayInMs);
 		Distributor<Long> distributor = new Distributor<Long>();
@@ -63,13 +62,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
+		HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private StageWithPort buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
+	private HeadPipeline<TCPReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
 		// create stages
 		TCPReader tcpReader = new TCPReader();
 		this.recordCounter = new Counter<IMonitoringRecord>();
@@ -96,14 +95,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 		SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new Pipeline<TCPReader, Sink<TraceEventRecords>>();
+		HeadPipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TCPReader, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(tcpReader);
-		pipeline.addIntermediateStage(this.recordCounter);
-		pipeline.addIntermediateStage(instanceOfFilter);
-		// pipeline.addIntermediateStage(this.recordThroughputFilter);
-		pipeline.addIntermediateStage(traceReconstructionFilter);
-		pipeline.addIntermediateStage(this.traceThroughputFilter);
-		pipeline.addIntermediateStage(this.traceCounter);
 		pipeline.setLastStage(endStage);
 		return pipeline;
 	}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
index 5b9bce4c53589173f00d08ea9ff0f36fa17c061e..9ff7290271d554783e20c329ba46faffb4448e3d 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
@@ -7,7 +7,7 @@ import java.util.List;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -104,7 +104,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 		dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir);
 
 		// create and configure pipeline
-		Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>();
+		HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>();
 		pipeline.setFirstStage(dir2RecordsFilter);
 		pipeline.addIntermediateStage(this.recordCounter);
 		pipeline.addIntermediateStage(cache);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
index 9d17509d5097e97ea7bb6c1c92cecef8e05528f9..650b54c557b0a6790c266882524b3c4049d78b05 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
@@ -9,7 +9,7 @@ import java.util.List;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.methodcallWithPorts.framework.core.Configuration;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
@@ -72,36 +72,37 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf
 	}
 
 	public void buildConfiguration() {
-		final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
 		this.getFiniteProducerStages().add(tcpPipeline);
 
-		final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		final HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
 		this.getInfiniteProducerStages().add(clockStage);
 
-		final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
+		final HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
 		this.getInfiniteProducerStages().add(clock2Stage);
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		for (int i = 0; i < this.numWorkerThreads; i++) {
-			StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
+			HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(),
+					clock2Stage.getLastStage());
 			this.getConsumerStages().add(pipeline);
 		}
 	}
 
-	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -110,7 +111,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
+		HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
@@ -146,7 +147,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf
 		}
 	}
 
-	private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline,
+	private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline,
 			final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
@@ -182,16 +183,9 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf
 		SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>("Worker pipeline");
+		HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(
+				"Worker pipeline");
 		pipeline.setFirstStage(relay);
-		pipeline.addIntermediateStage(recordCounter);
-		pipeline.addIntermediateStage(recordThroughputFilter);
-		pipeline.addIntermediateStage(traceMetadataCounter);
-		pipeline.addIntermediateStage(instanceOfFilter);
-		// pipeline.addIntermediateStage(this.recordThroughputFilter);
-		pipeline.addIntermediateStage(traceReconstructionFilter);
-		// pipeline.addIntermediateStage(traceThroughputFilter);
-		pipeline.addIntermediateStage(traceCounter);
 		// pipeline.addIntermediateStage(sysout);
 		pipeline.setLastStage(endStage);
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index 3f1d0ae1f45280256f9a30569c176ceac84e8ae2..3f6564aeb15341945a9a8d41b5e1877dfe164a7b 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -11,7 +11,7 @@ import java.util.TreeMap;
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.explicitScheduling.framework.core.Analysis;
-import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
@@ -55,38 +55,38 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
+		HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
 		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			StageWithPort pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
+			HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
 			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
+	private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
+		HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+	private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -95,7 +95,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
+		HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
@@ -155,9 +155,9 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		}
 	}
 
-	private Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
-			final Pipeline<Clock, Distributor<Long>> clockStage,
-			final Pipeline<Clock, Distributor<Long>> clock2Stage) {
+	private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
+			final HeadPipeline<Clock, Distributor<Long>> clockStage,
+			final HeadPipeline<Clock, Distributor<Long>> clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
@@ -190,15 +190,8 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
+		HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(relay);
-		pipeline.addIntermediateStage(recordCounter);
-		pipeline.addIntermediateStage(traceMetadataCounter);
-		pipeline.addIntermediateStage(instanceOfFilter);
-		pipeline.addIntermediateStage(traceReconstructionFilter);
-		pipeline.addIntermediateStage(traceReductionFilter);
-		pipeline.addIntermediateStage(traceCounter);
-		pipeline.addIntermediateStage(traceThroughputFilter);
 		pipeline.setLastStage(endStage);
 		return pipeline;
 	}
diff --git a/submodules/JCTools b/submodules/JCTools
index 75998aa20b7ec897ec321c1f94192de888f2dc6e..88e1e25f9519b250258c7e5ada30935975ab2d10 160000
--- a/submodules/JCTools
+++ b/submodules/JCTools
@@ -1 +1 @@
-Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e
+Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10