diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index cfc6ad91406e2d99595e44c26f6a7ad0f9094ebd..a9e06608edfb3443b7e9c708776ae2ce76cb0976 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -15,43 +15,38 @@
  */
 package teetime.framework;
 
-import java.lang.Thread.State;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 
 import org.jctools.queues.QueueFactory;
 import org.jctools.queues.spec.ConcurrentQueueSpec;
 import org.jctools.queues.spec.Ordering;
 import org.jctools.queues.spec.Preference;
-import org.slf4j.LoggerFactory;
 
 import teetime.framework.signal.ISignal;
+import teetime.util.concurrent.queue.PCBlockingQueue;
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
+import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
 
 public abstract class AbstractInterThreadPipe extends AbstractPipe {
 
-	private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractInterThreadPipe.class);
+	private final BlockingQueue<ISignal> signalQueue;
 
-	private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+	private volatile boolean isClosed;
 
 	protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort);
+		final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+		final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
+		final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>();
+		signalQueue = new PCBlockingQueue<ISignal>(localSignalQueue, putStrategy, takeStrategy);
 	}
 
 	@Override
 	public void sendSignal(final ISignal signal) {
 		this.signalQueue.offer(signal);
-
-		Thread owningThread = cachedTargetStage.getOwningThread();
-		if (owningThread == null && LOGGER.isWarnEnabled()) {
-			LOGGER.warn("owningThread of " + cachedTargetStage + " is null.");
-		}
-		if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance
-			owningThread.interrupt();
-		}
-	}
-
-	protected final boolean isThreadWaiting(final Thread thread) {
-		final State state = thread.getState(); // store state in variable for performance reasons
-		return state == State.WAITING || state == State.TIMED_WAITING;
 	}
 
 	/**
@@ -67,4 +62,20 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 	public void reportNewElement() { // NOPMD
 		// do nothing
 	}
+
+	@Override
+	public final void waitForStartSignal() throws InterruptedException {
+		final ISignal signal = signalQueue.take();
+		signal.trigger(getTargetPort().getOwningStage());
+	}
+
+	@Override
+	public final boolean isClosed() {
+		return isClosed;
+	}
+
+	@Override
+	public final void close() {
+		isClosed = true;
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
index d66714ad5484d3ea5e02b2a37348819a6a7947dc..bde2cba9ae7c03ef996f6c05901b55d027f2a6b4 100644
--- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
@@ -19,6 +19,8 @@ import teetime.framework.signal.ISignal;
 
 public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 
+	private boolean isClosed;
+
 	protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort);
 	}
@@ -34,4 +36,19 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 		this.cachedTargetStage.executeWithPorts();
 	}
 
+	@Override
+	public boolean isClosed() {
+		return isClosed;
+	}
+
+	@Override
+	public void close() {
+		isClosed = true;
+	}
+
+	@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// do nothing
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java
index 7ceceae3e2b917f42126f16169e8b2b9c48731f4..e0642ea8aa824667aaead75bfa89f94374965260 100644
--- a/src/main/java/teetime/framework/AbstractPipe.java
+++ b/src/main/java/teetime/framework/AbstractPipe.java
@@ -56,4 +56,8 @@ public abstract class AbstractPipe implements IPipe {
 		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
+	@Override
+	public final boolean hasMore() {
+		return !isEmpty();
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java
index 0b26490fbd0ce76deef9dda945606eda9f76277f..6b6b04d24437b3b1fa50449c824e28d369543da7 100644
--- a/src/main/java/teetime/framework/AbstractRunnableStage.java
+++ b/src/main/java/teetime/framework/AbstractRunnableStage.java
@@ -49,12 +49,14 @@ abstract class AbstractRunnableStage implements Runnable {
 		} catch (RuntimeException e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
 			throw e;
+		} catch (InterruptedException e) {
+			this.logger.error("Terminating thread due to the following exception: ", e);
 		}
 
 		this.logger.debug("Finished runnable stage. (" + stage.getId() + ")");
 	}
 
-	protected abstract void beforeStageExecution(Stage stage);
+	protected abstract void beforeStageExecution(Stage stage) throws InterruptedException;
 
 	protected abstract void executeStage(Stage stage);
 
diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java
index 210ecd4c6e685ac8bfd3cd5da978b4a55f46e47d..f71034daede673f2b84ef2340f5005caebd8b95d 100644
--- a/src/main/java/teetime/framework/CompositeStage.java
+++ b/src/main/java/teetime/framework/CompositeStage.java
@@ -98,4 +98,23 @@ public abstract class CompositeStage extends Stage {
 		INTRA_PIPE_FACTORY.create(out, in);
 	}
 
+	@Override
+	public final Thread getOwningThread() {
+		return getFirstStage().getOwningThread();
+	}
+
+	@Override
+	public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
+		getFirstStage().onValidating(invalidPortConnections);
+	}
+
+	@Override
+	public final void onStarting() throws Exception {
+		getFirstStage().onStarting();
+	}
+
+	@Override
+	public final void onTerminating() throws Exception {
+		getFirstStage().onTerminating();
+	}
 }
diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java
index f5bba1f053030fc1b14db59911ab38584336a8e9..d19f2500d3c9aed63f1330e0804dc6244f3e38a2 100644
--- a/src/main/java/teetime/framework/InputPort.java
+++ b/src/main/java/teetime/framework/InputPort.java
@@ -37,4 +37,12 @@ public final class InputPort<T> extends AbstractPort<T> {
 		return this.owningStage;
 	}
 
+	public boolean isClosed() {
+		return pipe.isClosed() && !pipe.hasMore();
+	}
+
+	public void waitForStartSignal() throws InterruptedException {
+		pipe.waitForStartSignal();
+	}
+
 }
diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java
index 1c49bfc6c471909741d38e4c2c29914a3a876fe0..99204663c1badcb60bb58f6e011c844d7052db16 100644
--- a/src/main/java/teetime/framework/OutputPort.java
+++ b/src/main/java/teetime/framework/OutputPort.java
@@ -16,6 +16,7 @@
 package teetime.framework;
 
 import teetime.framework.signal.ISignal;
+import teetime.framework.signal.TerminatingSignal;
 
 public final class OutputPort<T> extends AbstractPort<T> {
 
@@ -37,7 +38,10 @@ public final class OutputPort<T> extends AbstractPort<T> {
 	 *            to be sent; May not be <code>null</code>.
 	 */
 	public void sendSignal(final ISignal signal) {
-		this.pipe.sendSignal(signal);
+		if (signal instanceof TerminatingSignal) {
+			pipe.close();
+		}
+		pipe.sendSignal(signal);
 	}
 
 }
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index 3fc6a173d79b21d0ca75f7e0d4b03bbdf0390854..898a883f5cbcf34fc679832e0b1679e14b88c29b 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -17,12 +17,11 @@ package teetime.framework;
 
 import teetime.framework.idle.IdleStrategy;
 import teetime.framework.idle.YieldStrategy;
-import teetime.framework.pipe.IPipe;
 import teetime.framework.signal.ISignal;
+import teetime.framework.signal.TerminatingSignal;
 
 final class RunnableConsumerStage extends AbstractRunnableStage {
 
-	private final IdleStrategy idleStrategy;
 	// cache the input ports here since getInputPorts() always returns a new copy
 	private final InputPort<?>[] inputPorts;
 
@@ -38,18 +37,16 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 
 	public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
 		super(stage);
-		this.idleStrategy = idleStrategy;
 		this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
 	}
 
 	@Override
-	protected void beforeStageExecution(final Stage stage) {
+	protected void beforeStageExecution(final Stage stage) throws InterruptedException {
 		logger.trace("ENTRY beforeStageExecution");
 
-		do {
-			checkforSignals(stage);
-			Thread.yield();
-		} while (!stage.isStarted());
+		for (InputPort<?> inputPort : inputPorts) {
+			inputPort.waitForStartSignal();
+		}
 
 		logger.trace("EXIT beforeStageExecution");
 	}
@@ -59,33 +56,20 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 		try {
 			stage.executeWithPorts();
 		} catch (NotEnoughInputException e) {
-			checkforSignals(stage); // check for termination
-			executeIdleStrategy(stage);
+			checkForTerminationSignal(stage);
 		}
 	}
 
-	private void executeIdleStrategy(final Stage stage) {
-		if (stage.shouldBeTerminated()) {
-			return;
-		}
-		try {
-			idleStrategy.execute();
-		} catch (InterruptedException e) {
-			// checkforSignals(); // check for termination
+	private void checkForTerminationSignal(final Stage stage) {
+		for (InputPort<?> inputPort : inputPorts) {
+			if (!inputPort.isClosed()) {
+				return;
+			}
 		}
-	}
 
-	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
-	private void checkforSignals(final Stage stage) {
+		final ISignal signal = new TerminatingSignal();
 		for (InputPort<?> inputPort : inputPorts) {
-			final IPipe pipe = inputPort.getPipe();
-			if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed?
-				final AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe;
-				final ISignal signal = intraThreadPipe.getSignal();
-				if (null != signal) {
-					stage.onSignal(signal, inputPort);
-				}
-			}
+			stage.onSignal(signal, inputPort);
 		}
 	}
 
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index 27f7a11be99507826b20265a20215509251c7154..08f27d4c2936771c592a51e02931d2dbc1bafd1f 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -118,4 +118,12 @@ public abstract class Stage {
 
 	protected abstract boolean isStarted();
 
+	// events
+
+	public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections);
+
+	public abstract void onStarting() throws Exception;
+
+	public abstract void onTerminating() throws Exception;
+
 }
diff --git a/src/main/java/teetime/framework/pipe/CommittablePipe.java b/src/main/java/teetime/framework/pipe/CommittablePipe.java
index 0d8146671627897c87b59c97c778558a8f427a76..bb76738765ccd9444effd4a6288996f0704c6ca8 100644
--- a/src/main/java/teetime/framework/pipe/CommittablePipe.java
+++ b/src/main/java/teetime/framework/pipe/CommittablePipe.java
@@ -34,11 +34,6 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#add(T)
-	 */
 	@Override
 	public boolean add(final Object element) {
 		this.elements.addToTailUncommitted(element);
@@ -46,11 +41,6 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		return true;
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#removeLast()
-	 */
 	@Override
 	public Object removeLast() {
 		final Object element = this.elements.removeFromHeadUncommitted();
@@ -58,26 +48,11 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		return element;
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
-	 */
 	@Override
 	public boolean isEmpty() {
 		return this.elements.isEmpty();
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#readLast()
-	 */
-	@Override
-	public Object readLast() {
-		return this.elements.getTail();
-	}
-
 	public CommittableResizableArrayQueue<?> getElements() {
 		return this.elements;
 	}
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index 3b89683b00cd3f542feb741c8c450a483e643473..0ca1ea9b07584f9b768a93a5b8a65381fb03a71f 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -50,11 +50,6 @@ public final class DummyPipe implements IPipe {
 		return 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return null;
-	}
-
 	@Override
 	public InputPort<Object> getTargetPort() {
 		return null;
@@ -71,4 +66,28 @@ public final class DummyPipe implements IPipe {
 		// do nothing
 	}
 
+	@Override
+	public boolean isClosed() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public boolean hasMore() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
+
+	}
+
 }
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index 2ba08ad4993b3fa6bb399fd6b53f5bd8247da8e8..5db7c3ef3a333ed4ce5b61e50c6df4fb457375f5 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -54,13 +54,6 @@ public interface IPipe {
 	 */
 	Object removeLast();
 
-	/**
-	 * Reads the pipe's last element, but does not delete it.
-	 *
-	 * @return The last element in the pipe.
-	 */
-	Object readLast();
-
 	/**
 	 * Retrieves the receiving port.
 	 *
@@ -84,4 +77,14 @@ public interface IPipe {
 	 */
 	void reportNewElement();
 
+	boolean isClosed();
+
+	boolean hasMore();
+
+	// "signal" handling
+
+	void waitForStartSignal() throws InterruptedException;
+
+	void close();
+
 }
diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
index a8fe52049d8fcdda57f16d87c0e0b101fc84351f..0703849785d3687db668a7a1ddc9b25f16cb882f 100644
--- a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
@@ -57,11 +57,6 @@ public final class OrderedGrowableArrayPipe extends AbstractIntraThreadPipe {
 		return this.size() == 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements.get(this.head);
-	}
-
 	@Override
 	public int size() {
 		return this.tail - this.head;
diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
index d29abbbf3a4cf98381511e539a4966d0dcb539b6..b62481cefab7cc9957974040865c40beeef49d59 100644
--- a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
@@ -51,11 +51,6 @@ public class OrderedGrowablePipe extends AbstractIntraThreadPipe {
 		return this.elements.isEmpty();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements.peek();
-	}
-
 	@Override
 	public int size() {
 		return this.elements.size();
diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
index a322449dd8b64520b5154b0bc3fbc51e7e1068cb..3b112db9ddd9ab4b966bc3f406f32ebe7690a99b 100644
--- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
@@ -54,9 +54,4 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
 		return this.numInputObjects;
 	}
 
-	@Override
-	public T readLast() {
-		return null;
-	}
-
 }
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
index c6d519496b9b3862e7ab6b5475f8d853a576ed35..c24dd42f1cda55a15a64ea60af227207bdb72698 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
@@ -52,11 +52,6 @@ public final class SingleElementPipe extends AbstractIntraThreadPipe {
 		return this.element == null;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.element;
-	}
-
 	@Override
 	public int size() {
 		return (this.element == null) ? 0 : 1;
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index dc391f352ab9a9232df92e6eefed5cdc6f294bba..c1bb069433ea68ea34034ebdc7f0cae211b9390f 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -70,11 +70,6 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 		return this.queue.size();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.queue.peek();
-	}
-
 	// BETTER find a solution w/o any thread-safe code in this stage
 	public synchronized int getNumWaits() {
 		return this.numWaits;
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
index 07f906a8658edac939929f27249d3ec4a12ccf80..424e6b494812c8cfd548d630d10be832b2861bf5 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -56,9 +56,4 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
 		return this.queue.size();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.queue.peek();
-	}
-
 }
diff --git a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
index dc55ccd52b0f8ecc5a4168480d580519bd8dcf97..8d2c86b7b4155c41e58d241994f53e7ea99209f1 100644
--- a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
@@ -63,12 +63,6 @@ public final class UnorderedGrowablePipe extends AbstractIntraThreadPipe {
 		return this.lastFreeIndex == 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements[this.lastFreeIndex - 1];
-		// return this.elements.get(this.lastFreeIndex - 1);
-	}
-
 	@Override
 	public int size() {
 		return this.lastFreeIndex;
diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java
index ee6d5152fcc0d4ed3f80ffa1922d260e93e702b7..1e0df9fbac46234b85a1638f3245835ddde207b9 100644
--- a/src/main/java/teetime/framework/signal/ISignal.java
+++ b/src/main/java/teetime/framework/signal/ISignal.java
@@ -15,9 +15,9 @@
  */
 package teetime.framework.signal;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public interface ISignal {
 
-	void trigger(AbstractStage stage);
+	void trigger(Stage stage);
 }
diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java
index 646443bec562e3a3bcb7851a6f707accee9e3a59..256d7cde879259ac0d1e672cb4aa102003466892 100644
--- a/src/main/java/teetime/framework/signal/StartingSignal.java
+++ b/src/main/java/teetime/framework/signal/StartingSignal.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public final class StartingSignal implements ISignal {
 
@@ -31,7 +31,7 @@ public final class StartingSignal implements ISignal {
 	public StartingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		try {
 			stage.onStarting();
 		} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java
index d43f21f35741aeba9672a4f0b4e2395106fd5045..4c71e301c41356eaabf1b0d055b3ebf1b3793586 100644
--- a/src/main/java/teetime/framework/signal/TerminatingSignal.java
+++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public final class TerminatingSignal implements ISignal {
 
@@ -31,7 +31,7 @@ public final class TerminatingSignal implements ISignal {
 	public TerminatingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		try {
 			stage.onTerminating();
 		} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java
index 51757ca295094a635e37d28df0cf0aee2823d0b7..47137408b68809f79a501e991c38a36306a6a370 100644
--- a/src/main/java/teetime/framework/signal/ValidatingSignal.java
+++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java
@@ -18,7 +18,7 @@ package teetime.framework.signal;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 import teetime.framework.validation.InvalidPortConnection;
 
 public final class ValidatingSignal implements ISignal {
@@ -28,7 +28,7 @@ public final class ValidatingSignal implements ISignal {
 	public ValidatingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		stage.onValidating(this.invalidPortConnections);
 	}
 
diff --git a/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java b/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..262257c2d48069df60a44c76658b1ce177102f67
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java
@@ -0,0 +1,188 @@
+package teetime.util.concurrent.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
+
+public final class PCBlockingQueue<E> implements BlockingQueue<E> {
+
+	private final Queue<E> q;
+	private final PutStrategy<E> putStrategy;
+	private final TakeStrategy<E> takeStrategy;
+
+	// public PCBlockingQueue(final Queue<E> q, final Class<TakeStrategy<E>> takeStrategyClass, final Class<PutStrategy<E>> putStrategyClass) {
+	public PCBlockingQueue(final Queue<E> q, final PutStrategy<E> putStrategy, final TakeStrategy<E> takeStrategy) {
+		this.q = q;
+		this.putStrategy = putStrategy;
+		this.takeStrategy = takeStrategy;
+	}
+
+	@Override
+	public void put(final E e) throws InterruptedException
+	{
+		putStrategy.backoffOffer(q, e);
+	}
+
+	@Override
+	public E take() throws InterruptedException
+	{
+		return takeStrategy.waitPoll(q);
+	}
+
+	@Override
+	public boolean offer(final E e)
+	{
+		boolean offered = q.offer(e);
+		if (offered) {
+			takeStrategy.signal();
+		}
+		return offered;
+	}
+
+	@Override
+	public E poll()
+	{
+		E e = q.poll();
+		if (e != null) {
+			putStrategy.signal();
+		}
+		return e;
+	}
+
+	@Override
+	public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int remainingCapacity() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int drainTo(final Collection<? super E> c)
+	{
+		int count = 0;
+		E e;
+		while ((e = poll()) != null)
+		{
+			c.add(e);
+			count++;
+		}
+		return count;
+	}
+
+	@Override
+	public int drainTo(final Collection<? super E> c, final int maxElements)
+	{
+		int count = 0;
+		E e;
+		while (((e = poll()) != null) && count < maxElements)
+		{
+			c.add(e);
+			count++;
+		}
+		return count;
+	}
+
+	@Override
+	public boolean add(final E e) {
+		return q.add(e);
+	}
+
+	@Override
+	public int size() {
+		return q.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return q.isEmpty();
+	}
+
+	@Override
+	public boolean contains(final Object o) {
+		return q.contains(o);
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return q.iterator();
+	}
+
+	@Override
+	public E remove() {
+		return q.remove();
+	}
+
+	@Override
+	public Object[] toArray() {
+		return q.toArray();
+	}
+
+	@Override
+	public E element() {
+		return q.element();
+	}
+
+	@Override
+	public E peek() {
+		return q.peek();
+	}
+
+	@Override
+	public <T> T[] toArray(final T[] a) {
+		return q.toArray(a);
+	}
+
+	@Override
+	public boolean remove(final Object o) {
+		return q.remove(o);
+	}
+
+	@Override
+	public boolean containsAll(final Collection<?> c) {
+		return q.containsAll(c);
+	}
+
+	@Override
+	public boolean addAll(final Collection<? extends E> c) {
+		return q.addAll(c);
+	}
+
+	@Override
+	public boolean removeAll(final Collection<?> c) {
+		return q.removeAll(c);
+	}
+
+	@Override
+	public boolean retainAll(final Collection<?> c) {
+		return q.retainAll(c);
+	}
+
+	@Override
+	public void clear() {
+		q.clear();
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		return q.equals(o);
+	}
+
+	@Override
+	public int hashCode() {
+		return q.hashCode();
+	}
+
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java b/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..df20e0f48697e5dd8a48b84dd0ba74b5e30242a5
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java
@@ -0,0 +1,10 @@
+package teetime.util.concurrent.queue.putstrategy;
+
+import java.util.Queue;
+
+public interface PutStrategy<E>
+{
+	void backoffOffer(Queue<E> q, E e);
+
+	void signal();
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java b/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..a92a500064b9f01374bafc92b9478960ee1c2533
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java
@@ -0,0 +1,21 @@
+package teetime.util.concurrent.queue.putstrategy;
+
+import java.util.Queue;
+
+public class YieldPutStrategy<E> implements PutStrategy<E>
+{
+	@Override
+	public void backoffOffer(final Queue<E> q, final E e)
+	{
+		while (!q.offer(e))
+		{
+			Thread.yield();
+		}
+	}
+
+	@Override
+	public void signal()
+	{
+		// Nothing
+	}
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..b90843ce5a2e0e777d6a4358626895de4ca3d559
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java
@@ -0,0 +1,41 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+public final class SCParkTakeStrategy<E> implements TakeStrategy<E> {
+
+	public volatile int storeFence = 0;
+
+	private final AtomicReference<Thread> t = new AtomicReference<Thread>(null);
+
+	@Override
+	public void signal()
+	{
+		// Make sure the offer is visible before unpark
+		storeFence = 1; // store barrier
+
+		LockSupport.unpark(t.get()); // t.get() load barrier
+	}
+
+	@Override
+	public E waitPoll(final Queue<E> q) throws InterruptedException
+	{
+		E e = q.poll();
+		if (e != null)
+		{
+			return e;
+		}
+
+		t.set(Thread.currentThread());
+
+		while ((e = q.poll()) == null) {
+			LockSupport.park();
+		}
+
+		t.lazySet(null);
+
+		return e;
+	}
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..f772f864bf946c60886c8069d95e1a5a70d84484
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java
@@ -0,0 +1,10 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+
+public interface TakeStrategy<E>
+{
+	void signal();
+
+	E waitPoll(Queue<E> q) throws InterruptedException;
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..00377facc9a76677b5c8dca6b7cec9963055e488
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java
@@ -0,0 +1,23 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+
+public final class YieldTakeStrategy<E> implements TakeStrategy<E>
+{
+	@Override
+	public void signal()
+	{
+		// Nothing to do
+	}
+
+	@Override
+	public E waitPoll(final Queue<E> q) throws InterruptedException
+	{
+		E e;
+		while ((e = q.poll()) == null)
+		{
+			Thread.yield();
+		}
+		return e;
+	}
+}
diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java
index 30ecb4debac6d8c568e95fcf645c8e3a5ff3b733..5ab88cb7cd09ccb228b163927a743582361c205e 100644
--- a/src/performancetest/java/teetime/framework/OldPipeline.java
+++ b/src/performancetest/java/teetime/framework/OldPipeline.java
@@ -15,17 +15,17 @@
  */
 package teetime.framework;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
-import teetime.framework.signal.ISignal;
-import teetime.framework.validation.InvalidPortConnection;
-
 @Deprecated
-public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> extends Stage {
+public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> extends CompositeStage {
 
 	protected FirstStage firstStage;
-	protected LastStage lastStage;
+	private final List<LastStage> lastStages = new ArrayList<LastStage>();
 
+	@Override
 	public FirstStage getFirstStage() {
 		return this.firstStage;
 	}
@@ -34,62 +34,18 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
 		this.firstStage = firstStage;
 	}
 
-	public LastStage getLastStage() {
-		return this.lastStage;
-	}
-
 	public void setLastStage(final LastStage lastStage) {
-		this.lastStage = lastStage;
-	}
-
-	@Override
-	public void executeWithPorts() {
-		this.firstStage.executeWithPorts();
+		this.lastStages.clear();
+		this.lastStages.add(lastStage);
 	}
 
-	@Override
-	public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
-		this.firstStage.onSignal(signal, inputPort);
-	}
-
-	@Override
-	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
-		this.lastStage.validateOutputPorts(invalidPortConnections);
-	}
-
-	@Override
-	public void terminate() {
-		firstStage.terminate();
-	}
-
-	@Override
-	public boolean shouldBeTerminated() {
-		return firstStage.shouldBeTerminated();
-	}
-
-	@Override
-	protected InputPort<?>[] getInputPorts() {
-		return firstStage.getInputPorts();
-	}
-
-	@Override
-	public Thread getOwningThread() {
-		return firstStage.getOwningThread();
-	}
-
-	@Override
-	void setOwningThread(final Thread owningThread) {
-		firstStage.setOwningThread(owningThread);
-	}
-
-	@Override
-	public TerminationStrategy getTerminationStrategy() {
-		return firstStage.getTerminationStrategy();
+	public LastStage getLastStage() {
+		return lastStages.get(0);
 	}
 
 	@Override
-	protected boolean isStarted() {
-		return firstStage.isStarted();
+	protected Collection<? extends Stage> getLastStages() {
+		return lastStages;
 	}
 
 }
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index 2838b26341a18d9d491e9d03fd7cf48efbdc6755..5cf29ff72592bce09de069099b75406e03df3e35 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -70,27 +70,43 @@ public class MergerTestingPipe implements IPipe {
 	}
 
 	@Override
-	public Object readLast() {
+	public InputPort<?> getTargetPort() {
 		// TODO Auto-generated method stub
 		return null;
 	}
 
 	@Override
-	public InputPort<?> getTargetPort() {
+	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		// TODO Auto-generated method stub
-		return null;
+
 	}
 
 	@Override
-	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	public void reportNewElement() {
 		// TODO Auto-generated method stub
 
 	}
 
 	@Override
-	public void reportNewElement() {
+	public boolean isClosed() {
 		// TODO Auto-generated method stub
+		return false;
+	}
 
+	@Override
+	public boolean hasMore() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// TODO Auto-generated method stub
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
 	}
 
 }