From e26045c68f9b7b09b08123873eb0587455e71737 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Mon, 23 Feb 2015 13:47:46 +0100
Subject: [PATCH] changed start and terminate implementation

---
 .../framework/AbstractInterThreadPipe.java    |  45 +++--
 .../framework/AbstractIntraThreadPipe.java    |  17 ++
 .../java/teetime/framework/AbstractPipe.java  |   4 +
 .../framework/AbstractRunnableStage.java      |   4 +-
 .../teetime/framework/CompositeStage.java     |  19 ++
 .../java/teetime/framework/InputPort.java     |   8 +
 .../java/teetime/framework/OutputPort.java    |   6 +-
 .../framework/RunnableConsumerStage.java      |  42 ++--
 src/main/java/teetime/framework/Stage.java    |   8 +
 .../framework/pipe/CommittablePipe.java       |  25 ---
 .../teetime/framework/pipe/DummyPipe.java     |  29 ++-
 .../java/teetime/framework/pipe/IPipe.java    |  17 +-
 .../pipe/OrderedGrowableArrayPipe.java        |   5 -
 .../framework/pipe/OrderedGrowablePipe.java   |   5 -
 .../teetime/framework/pipe/RelayTestPipe.java |   5 -
 .../framework/pipe/SingleElementPipe.java     |   5 -
 .../java/teetime/framework/pipe/SpScPipe.java |   5 -
 .../framework/pipe/UnboundedSpScPipe.java     |   5 -
 .../framework/pipe/UnorderedGrowablePipe.java |   6 -
 .../teetime/framework/signal/ISignal.java     |   4 +-
 .../framework/signal/StartingSignal.java      |   4 +-
 .../framework/signal/TerminatingSignal.java   |   4 +-
 .../framework/signal/ValidatingSignal.java    |   4 +-
 .../concurrent/queue/PCBlockingQueue.java     | 188 ++++++++++++++++++
 .../queue/putstrategy/PutStrategy.java        |  10 +
 .../queue/putstrategy/YieldPutStrategy.java   |  21 ++
 .../takestrategy/SCParkTakeStrategy.java      |  41 ++++
 .../queue/takestrategy/TakeStrategy.java      |  10 +
 .../queue/takestrategy/YieldTakeStrategy.java |  23 +++
 .../java/teetime/framework/OldPipeline.java   |  66 +-----
 .../stage/basic/merger/MergerTestingPipe.java |  26 ++-
 31 files changed, 472 insertions(+), 189 deletions(-)
 create mode 100644 src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java
 create mode 100644 src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java
 create mode 100644 src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java
 create mode 100644 src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java
 create mode 100644 src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java
 create mode 100644 src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java

diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index cfc6ad91..a9e06608 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -15,43 +15,38 @@
  */
 package teetime.framework;
 
-import java.lang.Thread.State;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 
 import org.jctools.queues.QueueFactory;
 import org.jctools.queues.spec.ConcurrentQueueSpec;
 import org.jctools.queues.spec.Ordering;
 import org.jctools.queues.spec.Preference;
-import org.slf4j.LoggerFactory;
 
 import teetime.framework.signal.ISignal;
+import teetime.util.concurrent.queue.PCBlockingQueue;
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
+import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
 
 public abstract class AbstractInterThreadPipe extends AbstractPipe {
 
-	private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractInterThreadPipe.class);
+	private final BlockingQueue<ISignal> signalQueue;
 
-	private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+	private volatile boolean isClosed;
 
 	protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort);
+		final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
+		final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
+		final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>();
+		signalQueue = new PCBlockingQueue<ISignal>(localSignalQueue, putStrategy, takeStrategy);
 	}
 
 	@Override
 	public void sendSignal(final ISignal signal) {
 		this.signalQueue.offer(signal);
-
-		Thread owningThread = cachedTargetStage.getOwningThread();
-		if (owningThread == null && LOGGER.isWarnEnabled()) {
-			LOGGER.warn("owningThread of " + cachedTargetStage + " is null.");
-		}
-		if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance
-			owningThread.interrupt();
-		}
-	}
-
-	protected final boolean isThreadWaiting(final Thread thread) {
-		final State state = thread.getState(); // store state in variable for performance reasons
-		return state == State.WAITING || state == State.TIMED_WAITING;
 	}
 
 	/**
@@ -67,4 +62,20 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 	public void reportNewElement() { // NOPMD
 		// do nothing
 	}
+
+	@Override
+	public final void waitForStartSignal() throws InterruptedException {
+		final ISignal signal = signalQueue.take();
+		signal.trigger(getTargetPort().getOwningStage());
+	}
+
+	@Override
+	public final boolean isClosed() {
+		return isClosed;
+	}
+
+	@Override
+	public final void close() {
+		isClosed = true;
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
index d66714ad..bde2cba9 100644
--- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
@@ -19,6 +19,8 @@ import teetime.framework.signal.ISignal;
 
 public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 
+	private boolean isClosed;
+
 	protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort);
 	}
@@ -34,4 +36,19 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 		this.cachedTargetStage.executeWithPorts();
 	}
 
+	@Override
+	public boolean isClosed() {
+		return isClosed;
+	}
+
+	@Override
+	public void close() {
+		isClosed = true;
+	}
+
+	@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// do nothing
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java
index 7ceceae3..e0642ea8 100644
--- a/src/main/java/teetime/framework/AbstractPipe.java
+++ b/src/main/java/teetime/framework/AbstractPipe.java
@@ -56,4 +56,8 @@ public abstract class AbstractPipe implements IPipe {
 		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
+	@Override
+	public final boolean hasMore() {
+		return !isEmpty();
+	}
 }
diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java
index 0b26490f..6b6b04d2 100644
--- a/src/main/java/teetime/framework/AbstractRunnableStage.java
+++ b/src/main/java/teetime/framework/AbstractRunnableStage.java
@@ -49,12 +49,14 @@ abstract class AbstractRunnableStage implements Runnable {
 		} catch (RuntimeException e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
 			throw e;
+		} catch (InterruptedException e) {
+			this.logger.error("Terminating thread due to the following exception: ", e);
 		}
 
 		this.logger.debug("Finished runnable stage. (" + stage.getId() + ")");
 	}
 
-	protected abstract void beforeStageExecution(Stage stage);
+	protected abstract void beforeStageExecution(Stage stage) throws InterruptedException;
 
 	protected abstract void executeStage(Stage stage);
 
diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java
index 210ecd4c..f71034da 100644
--- a/src/main/java/teetime/framework/CompositeStage.java
+++ b/src/main/java/teetime/framework/CompositeStage.java
@@ -98,4 +98,23 @@ public abstract class CompositeStage extends Stage {
 		INTRA_PIPE_FACTORY.create(out, in);
 	}
 
+	@Override
+	public final Thread getOwningThread() {
+		return getFirstStage().getOwningThread();
+	}
+
+	@Override
+	public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
+		getFirstStage().onValidating(invalidPortConnections);
+	}
+
+	@Override
+	public final void onStarting() throws Exception {
+		getFirstStage().onStarting();
+	}
+
+	@Override
+	public final void onTerminating() throws Exception {
+		getFirstStage().onTerminating();
+	}
 }
diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java
index f5bba1f0..d19f2500 100644
--- a/src/main/java/teetime/framework/InputPort.java
+++ b/src/main/java/teetime/framework/InputPort.java
@@ -37,4 +37,12 @@ public final class InputPort<T> extends AbstractPort<T> {
 		return this.owningStage;
 	}
 
+	public boolean isClosed() {
+		return pipe.isClosed() && !pipe.hasMore();
+	}
+
+	public void waitForStartSignal() throws InterruptedException {
+		pipe.waitForStartSignal();
+	}
+
 }
diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java
index 1c49bfc6..99204663 100644
--- a/src/main/java/teetime/framework/OutputPort.java
+++ b/src/main/java/teetime/framework/OutputPort.java
@@ -16,6 +16,7 @@
 package teetime.framework;
 
 import teetime.framework.signal.ISignal;
+import teetime.framework.signal.TerminatingSignal;
 
 public final class OutputPort<T> extends AbstractPort<T> {
 
@@ -37,7 +38,10 @@ public final class OutputPort<T> extends AbstractPort<T> {
 	 *            to be sent; May not be <code>null</code>.
 	 */
 	public void sendSignal(final ISignal signal) {
-		this.pipe.sendSignal(signal);
+		if (signal instanceof TerminatingSignal) {
+			pipe.close();
+		}
+		pipe.sendSignal(signal);
 	}
 
 }
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index 3fc6a173..898a883f 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -17,12 +17,11 @@ package teetime.framework;
 
 import teetime.framework.idle.IdleStrategy;
 import teetime.framework.idle.YieldStrategy;
-import teetime.framework.pipe.IPipe;
 import teetime.framework.signal.ISignal;
+import teetime.framework.signal.TerminatingSignal;
 
 final class RunnableConsumerStage extends AbstractRunnableStage {
 
-	private final IdleStrategy idleStrategy;
 	// cache the input ports here since getInputPorts() always returns a new copy
 	private final InputPort<?>[] inputPorts;
 
@@ -38,18 +37,16 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 
 	public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
 		super(stage);
-		this.idleStrategy = idleStrategy;
 		this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
 	}
 
 	@Override
-	protected void beforeStageExecution(final Stage stage) {
+	protected void beforeStageExecution(final Stage stage) throws InterruptedException {
 		logger.trace("ENTRY beforeStageExecution");
 
-		do {
-			checkforSignals(stage);
-			Thread.yield();
-		} while (!stage.isStarted());
+		for (InputPort<?> inputPort : inputPorts) {
+			inputPort.waitForStartSignal();
+		}
 
 		logger.trace("EXIT beforeStageExecution");
 	}
@@ -59,33 +56,20 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 		try {
 			stage.executeWithPorts();
 		} catch (NotEnoughInputException e) {
-			checkforSignals(stage); // check for termination
-			executeIdleStrategy(stage);
+			checkForTerminationSignal(stage);
 		}
 	}
 
-	private void executeIdleStrategy(final Stage stage) {
-		if (stage.shouldBeTerminated()) {
-			return;
-		}
-		try {
-			idleStrategy.execute();
-		} catch (InterruptedException e) {
-			// checkforSignals(); // check for termination
+	private void checkForTerminationSignal(final Stage stage) {
+		for (InputPort<?> inputPort : inputPorts) {
+			if (!inputPort.isClosed()) {
+				return;
+			}
 		}
-	}
 
-	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
-	private void checkforSignals(final Stage stage) {
+		final ISignal signal = new TerminatingSignal();
 		for (InputPort<?> inputPort : inputPorts) {
-			final IPipe pipe = inputPort.getPipe();
-			if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed?
-				final AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe;
-				final ISignal signal = intraThreadPipe.getSignal();
-				if (null != signal) {
-					stage.onSignal(signal, inputPort);
-				}
-			}
+			stage.onSignal(signal, inputPort);
 		}
 	}
 
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index 27f7a11b..08f27d4c 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -118,4 +118,12 @@ public abstract class Stage {
 
 	protected abstract boolean isStarted();
 
+	// events
+
+	public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections);
+
+	public abstract void onStarting() throws Exception;
+
+	public abstract void onTerminating() throws Exception;
+
 }
diff --git a/src/main/java/teetime/framework/pipe/CommittablePipe.java b/src/main/java/teetime/framework/pipe/CommittablePipe.java
index 0d814667..bb767387 100644
--- a/src/main/java/teetime/framework/pipe/CommittablePipe.java
+++ b/src/main/java/teetime/framework/pipe/CommittablePipe.java
@@ -34,11 +34,6 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#add(T)
-	 */
 	@Override
 	public boolean add(final Object element) {
 		this.elements.addToTailUncommitted(element);
@@ -46,11 +41,6 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		return true;
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#removeLast()
-	 */
 	@Override
 	public Object removeLast() {
 		final Object element = this.elements.removeFromHeadUncommitted();
@@ -58,26 +48,11 @@ public final class CommittablePipe extends AbstractIntraThreadPipe {
 		return element;
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
-	 */
 	@Override
 	public boolean isEmpty() {
 		return this.elements.isEmpty();
 	}
 
-	/*
-	 * (non-Javadoc)
-	 *
-	 * @see teetime.examples.throughput.methodcall.IPipe#readLast()
-	 */
-	@Override
-	public Object readLast() {
-		return this.elements.getTail();
-	}
-
 	public CommittableResizableArrayQueue<?> getElements() {
 		return this.elements;
 	}
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index 3b89683b..0ca1ea9b 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -50,11 +50,6 @@ public final class DummyPipe implements IPipe {
 		return 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return null;
-	}
-
 	@Override
 	public InputPort<Object> getTargetPort() {
 		return null;
@@ -71,4 +66,28 @@ public final class DummyPipe implements IPipe {
 		// do nothing
 	}
 
+	@Override
+	public boolean isClosed() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public boolean hasMore() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
+
+	}
+
 }
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index 2ba08ad4..5db7c3ef 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -54,13 +54,6 @@ public interface IPipe {
 	 */
 	Object removeLast();
 
-	/**
-	 * Reads the pipe's last element, but does not delete it.
-	 *
-	 * @return The last element in the pipe.
-	 */
-	Object readLast();
-
 	/**
 	 * Retrieves the receiving port.
 	 *
@@ -84,4 +77,14 @@ public interface IPipe {
 	 */
 	void reportNewElement();
 
+	boolean isClosed();
+
+	boolean hasMore();
+
+	// "signal" handling
+
+	void waitForStartSignal() throws InterruptedException;
+
+	void close();
+
 }
diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
index a8fe5204..07038497 100644
--- a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java
@@ -57,11 +57,6 @@ public final class OrderedGrowableArrayPipe extends AbstractIntraThreadPipe {
 		return this.size() == 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements.get(this.head);
-	}
-
 	@Override
 	public int size() {
 		return this.tail - this.head;
diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
index d29abbbf..b62481ce 100644
--- a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java
@@ -51,11 +51,6 @@ public class OrderedGrowablePipe extends AbstractIntraThreadPipe {
 		return this.elements.isEmpty();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements.peek();
-	}
-
 	@Override
 	public int size() {
 		return this.elements.size();
diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
index a322449d..3b112db9 100644
--- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
@@ -54,9 +54,4 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
 		return this.numInputObjects;
 	}
 
-	@Override
-	public T readLast() {
-		return null;
-	}
-
 }
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
index c6d51949..c24dd42f 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
@@ -52,11 +52,6 @@ public final class SingleElementPipe extends AbstractIntraThreadPipe {
 		return this.element == null;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.element;
-	}
-
 	@Override
 	public int size() {
 		return (this.element == null) ? 0 : 1;
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index dc391f35..c1bb0694 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -70,11 +70,6 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 		return this.queue.size();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.queue.peek();
-	}
-
 	// BETTER find a solution w/o any thread-safe code in this stage
 	public synchronized int getNumWaits() {
 		return this.numWaits;
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
index 07f906a8..424e6b49 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -56,9 +56,4 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
 		return this.queue.size();
 	}
 
-	@Override
-	public Object readLast() {
-		return this.queue.peek();
-	}
-
 }
diff --git a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
index dc55ccd5..8d2c86b7 100644
--- a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java
@@ -63,12 +63,6 @@ public final class UnorderedGrowablePipe extends AbstractIntraThreadPipe {
 		return this.lastFreeIndex == 0;
 	}
 
-	@Override
-	public Object readLast() {
-		return this.elements[this.lastFreeIndex - 1];
-		// return this.elements.get(this.lastFreeIndex - 1);
-	}
-
 	@Override
 	public int size() {
 		return this.lastFreeIndex;
diff --git a/src/main/java/teetime/framework/signal/ISignal.java b/src/main/java/teetime/framework/signal/ISignal.java
index ee6d5152..1e0df9fb 100644
--- a/src/main/java/teetime/framework/signal/ISignal.java
+++ b/src/main/java/teetime/framework/signal/ISignal.java
@@ -15,9 +15,9 @@
  */
 package teetime.framework.signal;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public interface ISignal {
 
-	void trigger(AbstractStage stage);
+	void trigger(Stage stage);
 }
diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java
index 646443be..256d7cde 100644
--- a/src/main/java/teetime/framework/signal/StartingSignal.java
+++ b/src/main/java/teetime/framework/signal/StartingSignal.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public final class StartingSignal implements ISignal {
 
@@ -31,7 +31,7 @@ public final class StartingSignal implements ISignal {
 	public StartingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		try {
 			stage.onStarting();
 		} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java
index d43f21f3..4c71e301 100644
--- a/src/main/java/teetime/framework/signal/TerminatingSignal.java
+++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java
@@ -21,7 +21,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 
 public final class TerminatingSignal implements ISignal {
 
@@ -31,7 +31,7 @@ public final class TerminatingSignal implements ISignal {
 	public TerminatingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		try {
 			stage.onTerminating();
 		} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
diff --git a/src/main/java/teetime/framework/signal/ValidatingSignal.java b/src/main/java/teetime/framework/signal/ValidatingSignal.java
index 51757ca2..47137408 100644
--- a/src/main/java/teetime/framework/signal/ValidatingSignal.java
+++ b/src/main/java/teetime/framework/signal/ValidatingSignal.java
@@ -18,7 +18,7 @@ package teetime.framework.signal;
 import java.util.LinkedList;
 import java.util.List;
 
-import teetime.framework.AbstractStage;
+import teetime.framework.Stage;
 import teetime.framework.validation.InvalidPortConnection;
 
 public final class ValidatingSignal implements ISignal {
@@ -28,7 +28,7 @@ public final class ValidatingSignal implements ISignal {
 	public ValidatingSignal() {}
 
 	@Override
-	public void trigger(final AbstractStage stage) {
+	public void trigger(final Stage stage) {
 		stage.onValidating(this.invalidPortConnections);
 	}
 
diff --git a/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java b/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java
new file mode 100644
index 00000000..262257c2
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/PCBlockingQueue.java
@@ -0,0 +1,188 @@
+package teetime.util.concurrent.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import teetime.util.concurrent.queue.putstrategy.PutStrategy;
+import teetime.util.concurrent.queue.takestrategy.TakeStrategy;
+
+public final class PCBlockingQueue<E> implements BlockingQueue<E> {
+
+	private final Queue<E> q;
+	private final PutStrategy<E> putStrategy;
+	private final TakeStrategy<E> takeStrategy;
+
+	// public PCBlockingQueue(final Queue<E> q, final Class<TakeStrategy<E>> takeStrategyClass, final Class<PutStrategy<E>> putStrategyClass) {
+	public PCBlockingQueue(final Queue<E> q, final PutStrategy<E> putStrategy, final TakeStrategy<E> takeStrategy) {
+		this.q = q;
+		this.putStrategy = putStrategy;
+		this.takeStrategy = takeStrategy;
+	}
+
+	@Override
+	public void put(final E e) throws InterruptedException
+	{
+		putStrategy.backoffOffer(q, e);
+	}
+
+	@Override
+	public E take() throws InterruptedException
+	{
+		return takeStrategy.waitPoll(q);
+	}
+
+	@Override
+	public boolean offer(final E e)
+	{
+		boolean offered = q.offer(e);
+		if (offered) {
+			takeStrategy.signal();
+		}
+		return offered;
+	}
+
+	@Override
+	public E poll()
+	{
+		E e = q.poll();
+		if (e != null) {
+			putStrategy.signal();
+		}
+		return e;
+	}
+
+	@Override
+	public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int remainingCapacity() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int drainTo(final Collection<? super E> c)
+	{
+		int count = 0;
+		E e;
+		while ((e = poll()) != null)
+		{
+			c.add(e);
+			count++;
+		}
+		return count;
+	}
+
+	@Override
+	public int drainTo(final Collection<? super E> c, final int maxElements)
+	{
+		int count = 0;
+		E e;
+		while (((e = poll()) != null) && count < maxElements)
+		{
+			c.add(e);
+			count++;
+		}
+		return count;
+	}
+
+	@Override
+	public boolean add(final E e) {
+		return q.add(e);
+	}
+
+	@Override
+	public int size() {
+		return q.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return q.isEmpty();
+	}
+
+	@Override
+	public boolean contains(final Object o) {
+		return q.contains(o);
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return q.iterator();
+	}
+
+	@Override
+	public E remove() {
+		return q.remove();
+	}
+
+	@Override
+	public Object[] toArray() {
+		return q.toArray();
+	}
+
+	@Override
+	public E element() {
+		return q.element();
+	}
+
+	@Override
+	public E peek() {
+		return q.peek();
+	}
+
+	@Override
+	public <T> T[] toArray(final T[] a) {
+		return q.toArray(a);
+	}
+
+	@Override
+	public boolean remove(final Object o) {
+		return q.remove(o);
+	}
+
+	@Override
+	public boolean containsAll(final Collection<?> c) {
+		return q.containsAll(c);
+	}
+
+	@Override
+	public boolean addAll(final Collection<? extends E> c) {
+		return q.addAll(c);
+	}
+
+	@Override
+	public boolean removeAll(final Collection<?> c) {
+		return q.removeAll(c);
+	}
+
+	@Override
+	public boolean retainAll(final Collection<?> c) {
+		return q.retainAll(c);
+	}
+
+	@Override
+	public void clear() {
+		q.clear();
+	}
+
+	@Override
+	public boolean equals(final Object o) {
+		return q.equals(o);
+	}
+
+	@Override
+	public int hashCode() {
+		return q.hashCode();
+	}
+
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java b/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java
new file mode 100644
index 00000000..df20e0f4
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/putstrategy/PutStrategy.java
@@ -0,0 +1,10 @@
+package teetime.util.concurrent.queue.putstrategy;
+
+import java.util.Queue;
+
+public interface PutStrategy<E>
+{
+	void backoffOffer(Queue<E> q, E e);
+
+	void signal();
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java b/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java
new file mode 100644
index 00000000..a92a5000
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/putstrategy/YieldPutStrategy.java
@@ -0,0 +1,21 @@
+package teetime.util.concurrent.queue.putstrategy;
+
+import java.util.Queue;
+
+public class YieldPutStrategy<E> implements PutStrategy<E>
+{
+	@Override
+	public void backoffOffer(final Queue<E> q, final E e)
+	{
+		while (!q.offer(e))
+		{
+			Thread.yield();
+		}
+	}
+
+	@Override
+	public void signal()
+	{
+		// Nothing
+	}
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java
new file mode 100644
index 00000000..b90843ce
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/SCParkTakeStrategy.java
@@ -0,0 +1,41 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+public final class SCParkTakeStrategy<E> implements TakeStrategy<E> {
+
+	public volatile int storeFence = 0;
+
+	private final AtomicReference<Thread> t = new AtomicReference<Thread>(null);
+
+	@Override
+	public void signal()
+	{
+		// Make sure the offer is visible before unpark
+		storeFence = 1; // store barrier
+
+		LockSupport.unpark(t.get()); // t.get() load barrier
+	}
+
+	@Override
+	public E waitPoll(final Queue<E> q) throws InterruptedException
+	{
+		E e = q.poll();
+		if (e != null)
+		{
+			return e;
+		}
+
+		t.set(Thread.currentThread());
+
+		while ((e = q.poll()) == null) {
+			LockSupport.park();
+		}
+
+		t.lazySet(null);
+
+		return e;
+	}
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java
new file mode 100644
index 00000000..f772f864
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/TakeStrategy.java
@@ -0,0 +1,10 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+
+public interface TakeStrategy<E>
+{
+	void signal();
+
+	E waitPoll(Queue<E> q) throws InterruptedException;
+}
diff --git a/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java b/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java
new file mode 100644
index 00000000..00377fac
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/queue/takestrategy/YieldTakeStrategy.java
@@ -0,0 +1,23 @@
+package teetime.util.concurrent.queue.takestrategy;
+
+import java.util.Queue;
+
+public final class YieldTakeStrategy<E> implements TakeStrategy<E>
+{
+	@Override
+	public void signal()
+	{
+		// Nothing to do
+	}
+
+	@Override
+	public E waitPoll(final Queue<E> q) throws InterruptedException
+	{
+		E e;
+		while ((e = q.poll()) == null)
+		{
+			Thread.yield();
+		}
+		return e;
+	}
+}
diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java
index 30ecb4de..5ab88cb7 100644
--- a/src/performancetest/java/teetime/framework/OldPipeline.java
+++ b/src/performancetest/java/teetime/framework/OldPipeline.java
@@ -15,17 +15,17 @@
  */
 package teetime.framework;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
-import teetime.framework.signal.ISignal;
-import teetime.framework.validation.InvalidPortConnection;
-
 @Deprecated
-public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> extends Stage {
+public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> extends CompositeStage {
 
 	protected FirstStage firstStage;
-	protected LastStage lastStage;
+	private final List<LastStage> lastStages = new ArrayList<LastStage>();
 
+	@Override
 	public FirstStage getFirstStage() {
 		return this.firstStage;
 	}
@@ -34,62 +34,18 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
 		this.firstStage = firstStage;
 	}
 
-	public LastStage getLastStage() {
-		return this.lastStage;
-	}
-
 	public void setLastStage(final LastStage lastStage) {
-		this.lastStage = lastStage;
-	}
-
-	@Override
-	public void executeWithPorts() {
-		this.firstStage.executeWithPorts();
+		this.lastStages.clear();
+		this.lastStages.add(lastStage);
 	}
 
-	@Override
-	public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
-		this.firstStage.onSignal(signal, inputPort);
-	}
-
-	@Override
-	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
-		this.lastStage.validateOutputPorts(invalidPortConnections);
-	}
-
-	@Override
-	public void terminate() {
-		firstStage.terminate();
-	}
-
-	@Override
-	public boolean shouldBeTerminated() {
-		return firstStage.shouldBeTerminated();
-	}
-
-	@Override
-	protected InputPort<?>[] getInputPorts() {
-		return firstStage.getInputPorts();
-	}
-
-	@Override
-	public Thread getOwningThread() {
-		return firstStage.getOwningThread();
-	}
-
-	@Override
-	void setOwningThread(final Thread owningThread) {
-		firstStage.setOwningThread(owningThread);
-	}
-
-	@Override
-	public TerminationStrategy getTerminationStrategy() {
-		return firstStage.getTerminationStrategy();
+	public LastStage getLastStage() {
+		return lastStages.get(0);
 	}
 
 	@Override
-	protected boolean isStarted() {
-		return firstStage.isStarted();
+	protected Collection<? extends Stage> getLastStages() {
+		return lastStages;
 	}
 
 }
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index 2838b263..5cf29ff7 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -70,27 +70,43 @@ public class MergerTestingPipe implements IPipe {
 	}
 
 	@Override
-	public Object readLast() {
+	public InputPort<?> getTargetPort() {
 		// TODO Auto-generated method stub
 		return null;
 	}
 
 	@Override
-	public InputPort<?> getTargetPort() {
+	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		// TODO Auto-generated method stub
-		return null;
+
 	}
 
 	@Override
-	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	public void reportNewElement() {
 		// TODO Auto-generated method stub
 
 	}
 
 	@Override
-	public void reportNewElement() {
+	public boolean isClosed() {
 		// TODO Auto-generated method stub
+		return false;
+	}
 
+	@Override
+	public boolean hasMore() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public void waitForStartSignal() throws InterruptedException {
+		// TODO Auto-generated method stub
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
 	}
 
 }
-- 
GitLab