From 53ff970adb5fa5216f537df3de872e0e5884f9fe Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 31 Aug 2014 21:49:15 +0200
Subject: [PATCH] added InterThreadPipe; added RelayTestPipe;

---
 .../framework/core/AbstractStage.java         |  1 +
 .../pipe/{Pipe.java => CommittablePipe.java}  | 12 ++---
 .../framework/core/pipe/InterThreadPipe.java  | 24 ++++++++++
 .../core/pipe/OrderedGrowableArrayPipe.java   |  2 +-
 .../framework/core/pipe/RelayTestPipe.java    | 46 +++++++++++++++++++
 .../core/pipe/SingleElementPipe.java          |  6 ++-
 .../framework/core/pipe/SpScPipe.java         | 19 +-------
 .../core/pipe/UnorderedGrowablePipe.java      | 10 +---
 .../methodcallWithPorts/stage/Relay.java      |  6 +--
 src/main/java/util/PerformanceTest.java       |  2 +-
 .../MethodCallThroughputAnalysis9.java        | 12 ++---
 .../experiment16/ChwHomePerformanceCheck.java |  2 +-
 .../MethodCallThroughputAnalysis17.java       | 36 ++++++---------
 13 files changed, 111 insertions(+), 67 deletions(-)
 rename src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/{Pipe.java => CommittablePipe.java} (91%)
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 83c2a52f..84828cf0 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -53,6 +53,7 @@ public abstract class AbstractStage implements StageWithPort {
 		outputPort.reportNewElement();
 
 		return true;
+		// return outputPort.send(element);
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
similarity index 91%
rename from src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
index 2dd45c93..5490feb1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
@@ -4,19 +4,19 @@ import teetime.util.list.CommittableResizableArrayQueue;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 
-public class Pipe<T> extends IntraThreadPipe<T> {
+public final class CommittablePipe<T> extends IntraThreadPipe<T> {
 
 	private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
 
 	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		IPipe<T> pipe = new Pipe<T>();
+		IPipe<T> pipe = new CommittablePipe<T>();
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
 	/*
 	 * (non-Javadoc)
-	 *
+	 * 
 	 * @see teetime.examples.throughput.methodcall.IPipe#add(T)
 	 */
 	@Override
@@ -28,7 +28,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 *
+	 * 
 	 * @see teetime.examples.throughput.methodcall.IPipe#removeLast()
 	 */
 	@Override
@@ -40,7 +40,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 *
+	 * 
 	 * @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
 	 */
 	@Override
@@ -50,7 +50,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
 
 	/*
 	 * (non-Javadoc)
-	 *
+	 * 
 	 * @see teetime.examples.throughput.methodcall.IPipe#readLast()
 	 */
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
new file mode 100644
index 00000000..abc3ff3b
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
@@ -0,0 +1,24 @@
+package teetime.variant.methodcallWithPorts.framework.core.pipe;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
+
+public abstract class InterThreadPipe<T> extends AbstractPipe<T> {
+
+	private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
+
+	@Override
+	public void setSignal(final Signal signal) {
+		this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
+	}
+
+	public Signal getSignal() {
+		return this.signal.get();
+	}
+
+	@Override
+	public void reportNewElement() {
+		// do nothing
+	}
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index 7a6b4aa0..86f2b991 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 
-public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
+public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
 
 	private CircularArray<T> elements;
 	private int head;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java
new file mode 100644
index 00000000..d7ab5e83
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java
@@ -0,0 +1,46 @@
+package teetime.variant.methodcallWithPorts.framework.core.pipe;
+
+import teetime.util.ConstructorClosure;
+
+public final class RelayTestPipe<T> extends InterThreadPipe<T> {
+
+	private int numInputObjects;
+	private final ConstructorClosure<T> inputObjectCreator;
+
+	public RelayTestPipe(final int numInputObjects,
+			final ConstructorClosure<T> inputObjectCreator) {
+		this.numInputObjects = numInputObjects;
+		this.inputObjectCreator = inputObjectCreator;
+	}
+
+	@Override
+	public boolean add(final T element) {
+		return false;
+	}
+
+	@Override
+	public T removeLast() {
+		if (this.numInputObjects == 0) {
+			return null;
+		} else {
+			this.numInputObjects--;
+			return this.inputObjectCreator.create();
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return (this.numInputObjects == 0);
+	}
+
+	@Override
+	public int size() {
+		return this.numInputObjects;
+	}
+
+	@Override
+	public T readLast() {
+		return null;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index 96eeba86..1d48809f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -3,10 +3,14 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 
-public class SingleElementPipe<T> extends IntraThreadPipe<T> {
+public final class SingleElementPipe<T> extends IntraThreadPipe<T> {
 
 	private T element;
 
+	SingleElementPipe() {
+		super();
+	}
+
 	@Deprecated
 	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
 		IPipe<T> pipe = new SingleElementPipe<T>();
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 9153edaf..4999edef 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -1,7 +1,6 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.jctools.queues.QueueFactory;
 import org.jctools.queues.spec.ConcurrentQueueSpec;
@@ -10,12 +9,10 @@ import org.jctools.queues.spec.Preference;
 
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
-import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
 
-public class SpScPipe<T> extends AbstractPipe<T> {
+public final class SpScPipe<T> extends InterThreadPipe<T> {
 
 	private final Queue<T> queue;
-	private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
 	// statistics
 	private int numWaits;
 
@@ -67,18 +64,4 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 		return this.numWaits;
 	}
 
-	@Override
-	public void setSignal(final Signal signal) {
-		this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
-	}
-
-	public Signal getSignal() {
-		return this.signal.get();
-	}
-
-	@Override
-	public void reportNewElement() {
-		// do nothing
-	}
-
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index 3befc87d..357762b4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -3,7 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 
-public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
+public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 
 	private final int MIN_CAPACITY;
 
@@ -12,7 +12,7 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 	private int lastFreeIndex;
 
 	@SuppressWarnings("unchecked")
-	public UnorderedGrowablePipe() {
+	UnorderedGrowablePipe() {
 		this.MIN_CAPACITY = 4;
 		this.elements = (T[]) new Object[this.MIN_CAPACITY];
 	}
@@ -23,12 +23,6 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
-	@Override
-	public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
-		sourcePort.setPipe(this);
-		targetPort.setPipe(this);
-	}
-
 	@Override
 	public boolean add(final T element) {
 		if (this.lastFreeIndex == this.elements.length) {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
index d31041fd..2c525299 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
@@ -2,14 +2,14 @@ package teetime.variant.methodcallWithPorts.stage;
 
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.InterThreadPipe;
 import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
 
 public class Relay<T> extends ProducerStage<T> {
 
 	private final InputPort<T> inputPort = this.createInputPort();
 
-	private SpScPipe<T> cachedCastedInputPipe;
+	private InterThreadPipe<T> cachedCastedInputPipe;
 
 	@Override
 	public void execute() {
@@ -26,7 +26,7 @@ public class Relay<T> extends ProducerStage<T> {
 
 	@Override
 	public void onStarting() {
-		this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe();
+		this.cachedCastedInputPipe = (InterThreadPipe<T>) this.inputPort.getPipe();
 		super.onStarting();
 	}
 
diff --git a/src/main/java/util/PerformanceTest.java b/src/main/java/util/PerformanceTest.java
index 0887ae91..7438d476 100644
--- a/src/main/java/util/PerformanceTest.java
+++ b/src/main/java/util/PerformanceTest.java
@@ -17,7 +17,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
 
 public abstract class PerformanceTest {
 
-	protected static final int NUM_OBJECTS_TO_CREATE = 100000;
+	protected static final int NUM_OBJECTS_TO_CREATE = 1000000;
 	protected static final int NUM_NOOP_FILTERS = 800;
 
 	public static final MeasurementRepository measurementRepository = new MeasurementRepository();
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
index f55156f5..117954b8 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java
@@ -23,7 +23,7 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
 import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.CommittablePipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
@@ -71,13 +71,13 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(collectorSink);
 
-		Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
-		Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		CommittablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+		CommittablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			Pipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+			CommittablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
-		Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
+		CommittablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		CommittablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java
index 8c73c415..8e3fe93b 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java
@@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile {
 		System.out.println("speedupC: " + speedupC);
 
 		assertEquals(2, speedupB, 0.3);
-		assertEquals(3.5, speedupC, 0.3);
+		assertEquals(3.6, speedupC, 0.3);
 	}
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
index 1cf3daf7..7d662b6a 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
@@ -25,10 +25,10 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
 import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.RelayTestPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
 import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
@@ -110,6 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		super.init();
 	}
 
+	@SuppressWarnings("unchecked")
 	private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
 			final ConstructorClosure<TimestampObject> inputObjectCreator) {
 		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
@@ -117,21 +118,19 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		Sink<TimestampObject> sink = new Sink<TimestampObject>();
 		Sink<Void> endStage = new Sink<Void>();
 
+		// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
+		// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
+
+		UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
+		distributor.getNewOutputPort().setPipe(new DummyPipe());
+
 		final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
 		pipeline.setFirstStage(objectProducer);
 		// pipeline.setFirstStage(sink);
 		// pipeline.setFirstStage(endStage);
-
 		pipeline.setLastStage(distributor);
 		// pipeline.setLastStage(sink);
 		// pipeline.setLastStage(new EndStage<TimestampObject>());
-
-		// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
-		// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
-
-		UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
-		distributor.getNewOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>());
-
 		return pipeline;
 	}
 
@@ -152,21 +151,11 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
 		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
 
-		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
-		pipeline.setFirstStage(relay);
-		pipeline.setLastStage(collectorSink);
-
-		IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false, SPSC_INITIAL_CAPACITY);
-		relay.getInputPort().setPipe(pipe);
-		IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe();
-		for (int i = 0; i < this.numInputObjects; i++) {
-			startPipe.add(this.inputObjectCreator.create());
-		}
-		// startPipe.close();
+		IPipe<TimestampObject> startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
 		startPipe.setSignal(new TerminatingSignal());
 
+		relay.getInputPort().setPipe(startPipe);
 		UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
-
 		UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
 			UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
@@ -174,6 +163,9 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
 		UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
 		UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
 
+		final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
+		pipeline.setFirstStage(relay);
+		pipeline.setLastStage(collectorSink);
 		return pipeline;
 	}
 
-- 
GitLab