From 14d733bb9ae935bf67f33d5edf18541666878150 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 17 Jun 2014 09:41:27 +0200
Subject: [PATCH] added spsc pipe

---
 results/overhead-findings.txt                 |   1 +
 .../util/concurrent/P1C1QueueOriginal3.java   | 198 ------------
 .../concurrent/spsc/FFBufferOrdered3.java     | 305 ++++++++++++++++++
 .../teetime/util/concurrent/spsc/Pow2.java    |  16 +
 .../util/concurrent/spsc/UnsafeAccess.java    |  26 ++
 ...dCallThoughputTimestampAnalysis14Test.java |  74 +++++
 .../MethodCallThroughputAnalysis14.java       | 125 +++++++
 .../throughput/methodcall/SpScPipe.java       |  40 +++
 8 files changed, 587 insertions(+), 198 deletions(-)
 delete mode 100644 src/main/java/teetime/util/concurrent/P1C1QueueOriginal3.java
 create mode 100644 src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java
 create mode 100644 src/main/java/teetime/util/concurrent/spsc/Pow2.java
 create mode 100644 src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java
 create mode 100644 src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis14Test.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis14.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java

diff --git a/results/overhead-findings.txt b/results/overhead-findings.txt
index 2ed967b2..a0da29fd 100644
--- a/results/overhead-findings.txt
+++ b/results/overhead-findings.txt
@@ -26,3 +26,4 @@
 	11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read)
 12: 3300 ns (recursive; argument/return w/o pipe)
 13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
+14: 21,000 ns (spsc pipe)
diff --git a/src/main/java/teetime/util/concurrent/P1C1QueueOriginal3.java b/src/main/java/teetime/util/concurrent/P1C1QueueOriginal3.java
deleted file mode 100644
index 996f6490..00000000
--- a/src/main/java/teetime/util/concurrent/P1C1QueueOriginal3.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright 2012 Real Logic Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package teetime.util.concurrent;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <ul>
- * <li>Lock free, observing single writer principal.
- * <li>Replacing the long fields with AtomicLong and using lazySet instead of volatile assignment.
- * <li>Using the power of 2 mask, forcing the capacity to next power of 2.
- * <li>Adding head and tail cache fields. Avoiding redundant volatile reads.
- * <li>Padding head/tail AtomicLong fields. Avoiding false sharing.
- * <li>Padding head/tail cache fields. Avoiding false sharing.
- * </ul>
- */
-public final class P1C1QueueOriginal3<E> implements Queue<E> {
-	private final int capacity;
-	private final int mask;
-	private final E[] buffer;
-
-	private final AtomicLong tail = new PaddedAtomicLong(0);
-	private final AtomicLong head = new PaddedAtomicLong(0);
-
-	public static class PaddedLong {
-		public long value = 0, p1, p2, p3, p4, p5, p6;
-	}
-
-	private final PaddedLong tailCache = new PaddedLong();
-	private final PaddedLong headCache = new PaddedLong();
-
-	@SuppressWarnings("unchecked")
-	public P1C1QueueOriginal3(final int capacity) {
-		this.capacity = P1C1QueueOriginal3.findNextPositivePowerOfTwo(capacity);
-		this.mask = this.capacity - 1;
-		this.buffer = (E[]) new Object[this.capacity];
-	}
-
-	public static int findNextPositivePowerOfTwo(final int value) {
-		return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
-	}
-
-	public boolean add(final E e) {
-		if (this.offer(e)) {
-			return true;
-		}
-
-		throw new IllegalStateException("Queue is full");
-	}
-
-	public boolean offer(final E e) {
-		if (null == e) {
-			throw new NullPointerException("Null is not a valid element");
-		}
-
-		final long currentTail = this.tail.get();
-		final long wrapPoint = currentTail - this.capacity;
-		if (this.headCache.value <= wrapPoint) {
-			this.headCache.value = this.head.get();
-			if (this.headCache.value <= wrapPoint) {
-				return false;
-			}
-		}
-
-		this.buffer[(int) currentTail & this.mask] = e;
-		this.tail.lazySet(currentTail + 1);
-
-		return true;
-	}
-
-	public E poll() {
-		final long currentHead = this.head.get();
-		if (currentHead >= this.tailCache.value) {
-			this.tailCache.value = this.tail.get();
-			if (currentHead >= this.tailCache.value) {
-				return null;
-			}
-		}
-
-		final int index = (int) currentHead & this.mask;
-		final E e = this.buffer[index];
-		this.buffer[index] = null;
-		this.head.lazySet(currentHead + 1);
-
-		return e;
-	}
-
-	public E remove() {
-		final E e = this.poll();
-		if (null == e) {
-			throw new NoSuchElementException("Queue is empty");
-		}
-
-		return e;
-	}
-
-	public E element() {
-		final E e = this.peek();
-		if (null == e) {
-			throw new NoSuchElementException("Queue is empty");
-		}
-
-		return e;
-	}
-
-	public E peek() {
-		return this.buffer[(int) this.head.get() & this.mask];
-	}
-
-	public int size() {
-		return (int) (this.tail.get() - this.head.get());
-	}
-
-	public boolean isEmpty() {
-		return this.tail.get() == this.head.get();
-	}
-
-	public boolean contains(final Object o) {
-		if (null == o) {
-			return false;
-		}
-
-		for (long i = this.head.get(), limit = this.tail.get(); i < limit; i++) {
-			final E e = this.buffer[(int) i & this.mask];
-			if (o.equals(e)) {
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-	public Iterator<E> iterator() {
-		throw new UnsupportedOperationException();
-	}
-
-	public Object[] toArray() {
-		throw new UnsupportedOperationException();
-	}
-
-	public <T> T[] toArray(final T[] a) {
-		throw new UnsupportedOperationException();
-	}
-
-	public boolean remove(final Object o) {
-		throw new UnsupportedOperationException();
-	}
-
-	public boolean containsAll(final Collection<?> c) {
-		for (final Object o : c) {
-			if (!this.contains(o)) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	public boolean addAll(final Collection<? extends E> c) {
-		for (final E e : c) {
-			this.add(e);
-		}
-
-		return true;
-	}
-
-	public boolean removeAll(final Collection<?> c) {
-		throw new UnsupportedOperationException();
-	}
-
-	public boolean retainAll(final Collection<?> c) {
-		throw new UnsupportedOperationException();
-	}
-
-	public void clear() {
-		Object value;
-		do {
-			value = this.poll();
-		} while (null != value);
-	}
-}
diff --git a/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java b/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java
new file mode 100644
index 00000000..9d8f0edb
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package teetime.util.concurrent.spsc;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+/**
+ * <ul>
+ * <li>Inlined counters
+ * <li>Counters are padded
+ * <li>Data is padded
+ * <li>Class is pre-padded
+ * <li>Padding is doubled to dodge pre-fetch
+ * <li>Use Unsafe to read out of array
+ * <li>putOrdered into array as Write Memory Barrier
+ * <li>getVolatile from array as Read Memory Barrier
+ * <li>ELEMENT_SHIFT is a constant
+ * </ul>
+ */
+class L0Pad {
+	public long p00, p01, p02, p03, p04, p05, p06, p07;
+	public long p30, p31, p32, p33, p34, p35, p36, p37;
+}
+
+class ColdFields<E> extends L0Pad {
+	protected static final int BUFFER_PAD = 32;
+	protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
+	protected final int capacity;
+	protected final long mask;
+	protected final E[] buffer;
+
+	@SuppressWarnings("unchecked")
+	public ColdFields(final int capacity) {
+		if (Pow2.isPowerOf2(capacity)) {
+			this.capacity = capacity;
+		}
+		else {
+			this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
+		}
+		this.mask = this.capacity - 1;
+		// pad data on either end with some empty slots.
+		this.buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
+	}
+}
+
+class L1Pad<E> extends ColdFields<E> {
+	public long p10, p11, p12, p13, p14, p15, p16;
+	public long p30, p31, p32, p33, p34, p35, p36, p37;
+
+	public L1Pad(final int capacity) {
+		super(capacity);
+	}
+}
+
+class TailField<E> extends L1Pad<E> {
+	protected long tail;
+
+	public TailField(final int capacity) {
+		super(capacity);
+	}
+}
+
+class L2Pad<E> extends TailField<E> {
+	public long p20, p21, p22, p23, p24, p25, p26;
+	public long p30, p31, p32, p33, p34, p35, p36, p37;
+
+	public L2Pad(final int capacity) {
+		super(capacity);
+	}
+}
+
+class HeadField<E> extends L2Pad<E> {
+	protected long head;
+
+	public HeadField(final int capacity) {
+		super(capacity);
+	}
+}
+
+class L3Pad<E> extends HeadField<E> {
+	public long p40, p41, p42, p43, p44, p45, p46;
+	public long p30, p31, p32, p33, p34, p35, p36, p37;
+
+	public L3Pad(final int capacity) {
+		super(capacity);
+	}
+}
+
+@SuppressWarnings("restriction")
+public final class FFBufferOrdered3<E> extends L3Pad<E> implements Queue<E> {
+	private final static long TAIL_OFFSET;
+	private final static long HEAD_OFFSET;
+	private static final long ARRAY_BASE;
+	private static final int ELEMENT_SHIFT;
+	static {
+		try {
+			TAIL_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(TailField.class.getDeclaredField("tail"));
+			HEAD_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(HeadField.class.getDeclaredField("head"));
+			final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
+			if (4 == scale) {
+				ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
+			} else if (8 == scale) {
+				ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
+			} else {
+				throw new IllegalStateException("Unknown pointer size");
+			}
+			// Including the buffer pad in the array base offset
+			ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
+		} catch (NoSuchFieldException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public FFBufferOrdered3(final int capacity) {
+		super(capacity);
+	}
+
+	private long getHead() {
+		return UnsafeAccess.UNSAFE.getLongVolatile(this, HEAD_OFFSET);
+	}
+
+	private long getTail() {
+		return UnsafeAccess.UNSAFE.getLongVolatile(this, TAIL_OFFSET);
+	}
+
+	@Override
+	public boolean add(final E e) {
+		if (this.offer(e)) {
+			return true;
+		}
+		throw new IllegalStateException("Queue is full");
+	}
+
+	private long elementOffsetInBuffer(final long index) {
+		return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT);
+	}
+
+	// private boolean available() {
+	// final long offset = this.elementOffsetInBuffer(this.tail);
+	// Object object = UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
+	// return object != null;
+	// }
+
+	@Override
+	public boolean offer(final E e) {
+		if (null == e) {
+			throw new NullPointerException("Null is not a valid element");
+		}
+
+		final long offset = this.elementOffsetInBuffer(this.tail);
+		if (null != UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset)) {
+			return false;
+		}
+		// STORE/STORE barrier, anything that happens before is visible
+		// when the value in the buffer is visible
+		UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, e);
+		this.tail++;
+		return true;
+	}
+
+	@Override
+	public E poll() {
+		final long offset = this.elementOffsetInBuffer(this.head);
+		@SuppressWarnings("unchecked")
+		final E e = (E) UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
+		if (null == e) {
+			return null;
+		}
+		UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, null);
+		this.head++;
+		return e;
+	}
+
+	@Override
+	public E remove() {
+		final E e = this.poll();
+		if (null == e) {
+			throw new NoSuchElementException("Queue is empty");
+		}
+
+		return e;
+	}
+
+	@Override
+	public E element() {
+		final E e = this.peek();
+		if (null == e) {
+			throw new NoSuchElementException("Queue is empty");
+		}
+
+		return e;
+	}
+
+	@Override
+	public E peek() {
+		long currentHead = this.getHead();
+		return this.getElement(currentHead);
+	}
+
+	@SuppressWarnings("unchecked")
+	private E getElement(final long index) {
+		return (E) UnsafeAccess.UNSAFE.getObject(this.buffer, this.elementOffsetInBuffer(index));
+	}
+
+	@Override
+	public int size() {
+		return (int) (this.getTail() - this.getHead());
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.getTail() == this.getHead();
+		// return null == this.getHead();
+	}
+
+	@Override
+	public boolean contains(final Object o) {
+		if (null == o) {
+			return false;
+		}
+
+		for (long i = this.getHead(), limit = this.getTail(); i < limit; i++) {
+			final E e = this.getElement(i);
+			if (o.equals(e)) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Object[] toArray() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> T[] toArray(final T[] a) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean remove(final Object o) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean containsAll(final Collection<?> c) {
+		for (final Object o : c) {
+			if (!this.contains(o)) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	@Override
+	public boolean addAll(final Collection<? extends E> c) {
+		for (final E e : c) {
+			this.add(e);
+		}
+
+		return true;
+	}
+
+	@Override
+	public boolean removeAll(final Collection<?> c) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean retainAll(final Collection<?> c) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void clear() {
+		Object value;
+		do {
+			value = this.poll();
+		} while (null != value);
+	}
+
+}
diff --git a/src/main/java/teetime/util/concurrent/spsc/Pow2.java b/src/main/java/teetime/util/concurrent/spsc/Pow2.java
new file mode 100644
index 00000000..266964aa
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/spsc/Pow2.java
@@ -0,0 +1,16 @@
+package teetime.util.concurrent.spsc;
+
+public class Pow2 {
+
+	private Pow2() {
+		// utility class
+	}
+
+	public static int findNextPositivePowerOfTwo(final int value) {
+		return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+	}
+
+	public static boolean isPowerOf2(final int value) {
+		return (value & (value - 1)) == 0;
+	}
+}
diff --git a/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java b/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java
new file mode 100644
index 00000000..0c717842
--- /dev/null
+++ b/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java
@@ -0,0 +1,26 @@
+package teetime.util.concurrent.spsc;
+
+import java.lang.reflect.Field;
+
+import sun.misc.Unsafe;
+
+@SuppressWarnings("restriction")
+class UnsafeAccess {
+
+	public static final Unsafe UNSAFE;
+
+	static {
+		try {
+			Field field = Unsafe.class.getDeclaredField("theUnsafe");
+			field.setAccessible(true);
+			UNSAFE = (Unsafe) field.get(null);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private UnsafeAccess() {
+		// utility class
+	}
+
+}
diff --git a/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis14Test.java b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis14Test.java
new file mode 100644
index 00000000..67579159
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/MethodCallThoughputTimestampAnalysis14Test.java
@@ -0,0 +1,74 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.examples.throughput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis14;
+import teetime.util.StatisticsUtil;
+import teetime.util.StopWatch;
+
+import kieker.common.logging.LogFactory;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class MethodCallThoughputTimestampAnalysis14Test {
+
+	private static final int NUM_OBJECTS_TO_CREATE = 100000;
+	private static final int NUM_NOOP_FILTERS = 800;
+
+	@Before
+	public void before() {
+		System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
+	}
+
+	@Test
+	public void testWithManyObjects() {
+		System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+				+ NUM_NOOP_FILTERS + "...");
+		final StopWatch stopWatch = new StopWatch();
+		final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
+
+		final MethodCallThroughputAnalysis14 analysis = new MethodCallThroughputAnalysis14();
+		analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
+		analysis.setTimestampObjects(timestampObjects);
+		analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
+			@Override
+			public TimestampObject call() throws Exception {
+				return new TimestampObject();
+			}
+		});
+		analysis.init();
+
+		stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			stopWatch.end();
+		}
+
+		StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
+	}
+
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis14.java
new file mode 100644
index 00000000..61454d0b
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis14.java
@@ -0,0 +1,125 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.examples.throughput.methodcall;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import teetime.examples.throughput.TimestampObject;
+import teetime.examples.throughput.methodcall.stage.CollectorSink;
+import teetime.examples.throughput.methodcall.stage.NoopFilter;
+import teetime.examples.throughput.methodcall.stage.ObjectProducer;
+import teetime.examples.throughput.methodcall.stage.Pipeline;
+import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
+import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
+import teetime.framework.core.Analysis;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class MethodCallThroughputAnalysis14 extends Analysis {
+
+	private long numInputObjects;
+	private Callable<TimestampObject> inputObjectCreator;
+	private int numNoopFilters;
+	private List<TimestampObject> timestampObjects;
+	private Runnable runnable;
+
+	@Override
+	public void init() {
+		super.init();
+		this.runnable = this.buildPipeline();
+	}
+
+	/**
+	 * @param numNoopFilters
+	 * @since 1.10
+	 */
+	private Runnable buildPipeline() {
+		@SuppressWarnings("unchecked")
+		final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
+		// create stages
+		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
+		final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
+		for (int i = 0; i < noopFilters.length; i++) {
+			noopFilters[i] = new NoopFilter<TimestampObject>();
+		}
+		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
+		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
+
+		final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>();
+		pipeline.setFirstStage(objectProducer);
+		pipeline.addIntermediateStage(startTimestampFilter);
+		pipeline.addIntermediateStages(noopFilters);
+		pipeline.addIntermediateStage(stopTimestampFilter);
+		pipeline.setLastStage(collectorSink);
+
+		SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+		SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		for (int i = 0; i < noopFilters.length - 1; i++) {
+			SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+		}
+		SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
+
+		pipeline.onStart();
+
+		// pipeline.getInputPort().pipe = new Pipe<Void>();
+		// pipeline.getInputPort().pipe.add(new Object());
+
+		// pipeline.getOutputPort().pipe = new Pipe<Void>();
+
+		final Runnable runnable = new Runnable() {
+			@Override
+			public void run() {
+				do {
+					pipeline.executeWithPorts();
+				} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
+			}
+		};
+
+		return runnable;
+	}
+
+	@Override
+	public void start() {
+		super.start();
+		this.runnable.run();
+	}
+
+	public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
+		this.numInputObjects = numInputObjects;
+		this.inputObjectCreator = inputObjectCreator;
+	}
+
+	public int getNumNoopFilters() {
+		return this.numNoopFilters;
+	}
+
+	public void setNumNoopFilters(final int numNoopFilters) {
+		this.numNoopFilters = numNoopFilters;
+	}
+
+	public List<TimestampObject> getTimestampObjects() {
+		return this.timestampObjects;
+	}
+
+	public void setTimestampObjects(final List<TimestampObject> timestampObjects) {
+		this.timestampObjects = timestampObjects;
+	}
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java
new file mode 100644
index 00000000..f499a237
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/SpScPipe.java
@@ -0,0 +1,40 @@
+package teetime.examples.throughput.methodcall;
+
+import teetime.util.concurrent.spsc.FFBufferOrdered3;
+
+public class SpScPipe<T> implements IPipe<T> {
+
+	private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(4);
+
+	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
+		IPipe<T> pipe = new SpScPipe<T>();
+		sourcePort.pipe = pipe;
+		targetPort.pipe = pipe;
+	}
+
+	@Override
+	public void add(final T element) {
+		this.queue.offer(element);
+	}
+
+	@Override
+	public T removeLast() {
+		return this.queue.poll();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.queue.isEmpty();
+	}
+
+	@Override
+	public int size() {
+		return this.queue.size();
+	}
+
+	@Override
+	public T readLast() {
+		return this.queue.peek();
+	}
+
+}
-- 
GitLab