From fb3337778c295a3952908be8dc9f0433a93c8c66 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 5 Sep 2014 03:45:06 +0200
Subject: [PATCH] worked on setting attributes in AbstractStage to final

---
 .../framework/core/InputPort.java             |  1 -
 .../framework/core/pipe/AbstractPipe.java     | 21 +++++++++++++------
 .../framework/core/pipe/CommittablePipe.java  |  6 +++++-
 .../framework/core/pipe/DummyPipe.java        |  3 ---
 .../framework/core/pipe/IPipe.java            |  3 +--
 .../framework/core/pipe/IPipeFactory.java     |  7 +++++++
 .../framework/core/pipe/InterThreadPipe.java  |  6 ++++++
 .../framework/core/pipe/IntraThreadPipe.java  |  8 ++++++-
 .../core/pipe/OrderedGrowableArrayPipe.java   | 11 ++++------
 .../pipe/OrderedGrowableArrayPipeFactory.java | 17 +++++++++++----
 .../core/pipe/OrderedGrowablePipe.java        |  9 +++-----
 .../framework/core/pipe/PipeFactory.java      | 18 +++++++++-------
 .../framework/core/pipe/RelayTestPipe.java    |  4 ++--
 .../core/pipe/SingleElementPipe.java          |  6 +++---
 .../core/pipe/SingleElementPipeFactory.java   | 17 ++++++++++++++-
 .../framework/core/pipe/SpScPipe.java         |  8 +++----
 .../framework/core/pipe/SpScPipeFactory.java  | 15 ++++++++++++-
 .../core/pipe/UnorderedGrowablePipe.java      | 10 ++++-----
 .../pipe/UnorderedGrowablePipeFactory.java    | 14 ++++++++++++-
 19 files changed, 128 insertions(+), 56 deletions(-)

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
index 02d74d49..be4ed828 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
@@ -31,7 +31,6 @@ public class InputPort<T> extends AbstractPort<T> {
 	@Override
 	public void setPipe(final IPipe pipe) {
 		this.pipe = pipe;
-		pipe.setTargetPort(this);
 	}
 
 	public StageWithPort getOwningStage() {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
index e222c257..d34fd860 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
@@ -17,21 +17,30 @@ public abstract class AbstractPipe implements IPipe {
 	 */
 	protected StageWithPort cachedTargetStage;
 
-	@Override
-	public InputPort<?> getTargetPort() {
-		return this.targetPort;
+	protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		this.targetPort = targetPort;
+		if (null != targetPort) { // BETTER remove this check if migration is completed
+			this.cachedTargetStage = targetPort.getOwningStage();
+		}
+		if (null != sourcePort) { // BETTER remove this check if migration is completed
+			sourcePort.setPipe(this);
+		}
+		if (null != targetPort) { // BETTER remove this check if migration is completed
+			targetPort.setPipe(this);
+		}
 	}
 
 	@Override
-	public void setTargetPort(final InputPort<?> targetPort) {
-		this.targetPort = targetPort;
-		this.cachedTargetStage = targetPort.getOwningStage();
+	public InputPort<?> getTargetPort() {
+		return this.targetPort;
 	}
 
 	@Override
 	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		sourcePort.setPipe(this);
 		targetPort.setPipe(this);
+		this.targetPort = targetPort;
+		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
index 9cf15055..2badc2ea 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java
@@ -8,9 +8,13 @@ public final class CommittablePipe extends IntraThreadPipe {
 
 	private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4);
 
+	<T> CommittablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
+	}
+
 	@Deprecated
 	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		IPipe pipe = new CommittablePipe();
+		IPipe pipe = new CommittablePipe(null, null);
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
index 4c8c195f..26888277 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
@@ -43,9 +43,6 @@ public final class DummyPipe implements IPipe {
 		return null;
 	}
 
-	@Override
-	public void setTargetPort(final InputPort targetPort) {}
-
 	@Override
 	public void setSignal(final Signal signal) {}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
index 28c43332..a89a3c68 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
@@ -18,10 +18,9 @@ public interface IPipe {
 
 	InputPort<?> getTargetPort();
 
-	void setTargetPort(InputPort<?> targetPort);
-
 	void setSignal(Signal signal);
 
+	@Deprecated
 	<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
 
 	void reportNewElement();
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
index 382fe5f2..da4bbd44 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java
@@ -1,12 +1,19 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 
 public interface IPipeFactory {
 
+	@Deprecated
 	IPipe create(int capacity);
 
+	<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
+
+	<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
+
 	ThreadCommunication getThreadCommunication();
 
 	PipeOrdering getOrdering();
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
index 94c562eb..37ad7022 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java
@@ -2,12 +2,18 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
 import java.util.concurrent.atomic.AtomicReference;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
 
 public abstract class InterThreadPipe extends AbstractPipe {
 
 	private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
 
+	<T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
+	}
+
 	@Override
 	public void setSignal(final Signal signal) {
 		this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
index 874663cc..04857a9e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
@@ -1,12 +1,18 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
 
 public abstract class IntraThreadPipe extends AbstractPipe {
 
+	<T> IntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
+	}
+
 	@Override
 	public void setSignal(final Signal signal) {
-		if (this.getTargetPort() != null) {
+		if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts
 			this.cachedTargetStage.onSignal(signal, this.getTargetPort());
 		}
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
index f1944529..e059acd4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java
@@ -10,17 +10,14 @@ public final class OrderedGrowableArrayPipe extends IntraThreadPipe {
 	private int head;
 	private int tail;
 
-	public OrderedGrowableArrayPipe() {
-		this(1);
-	}
-
-	public OrderedGrowableArrayPipe(final int initialCapacity) {
-		this.elements = new CircularArray<Object>(initialCapacity);
+	<T> OrderedGrowableArrayPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort);
+		this.elements = new CircularArray<Object>(capacity);
 	}
 
 	@Deprecated
 	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		IPipe pipe = new OrderedGrowableArrayPipe();
+		IPipe pipe = new OrderedGrowableArrayPipe(sourcePort, targetPort, 4);
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
index 86a00ca1..7290a782 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java
@@ -1,16 +1,25 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 
 public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
 
-	/**
-	 * Hint: The capacity for this pipe implementation is ignored
-	 */
 	@Override
 	public IPipe create(final int capacity) {
-		return new OrderedGrowableArrayPipe();
+		return create(null, null, capacity);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return create(sourcePort, targetPort, 4);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new OrderedGrowableArrayPipe(sourcePort, targetPort, capacity);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
index d63a5f86..eab5df8f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java
@@ -9,17 +9,14 @@ public class OrderedGrowablePipe extends IntraThreadPipe {
 
 	private final LinkedList<Object> elements;
 
-	public OrderedGrowablePipe() {
-		this(100000);
-	}
-
-	public OrderedGrowablePipe(final int initialCapacity) {
+	<T> OrderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort);
 		this.elements = new LinkedList<Object>();
 	}
 
 	@Deprecated
 	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		IPipe pipe = new OrderedGrowablePipe();
+		IPipe pipe = new OrderedGrowablePipe(null, null, 100000);
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
index d876a385..20fbec00 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java
@@ -55,16 +55,17 @@ public class PipeFactory {
 	}
 
 	public IPipe create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
+		IPipeFactory pipeFactory = getPipeFactory(tc, ordering, growable);
+		return pipeFactory.create(capacity);
+	}
+
+	public IPipeFactory getPipeFactory(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
 		String key = this.buildKey(tc, ordering, growable);
-		IPipeFactory pipeClass = this.pipeFactories.get(key);
-		if (null == pipeClass) {
+		IPipeFactory pipeFactory = this.pipeFactories.get(key);
+		if (null == pipeFactory) {
 			throw new CouldNotFindPipeImplException(key);
 		}
-		return pipeClass.create(capacity);
-	}
-
-	private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
-		return tc.toString() + ordering.toString() + growable;
+		return pipeFactory;
 	}
 
 	public void register(final IPipeFactory pipeFactory) {
@@ -73,4 +74,7 @@ public class PipeFactory {
 		LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName());
 	}
 
+	private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
+		return tc.toString() + ordering.toString() + growable;
+	}
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java
index bbd11753..26e8da05 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java
@@ -7,8 +7,8 @@ public final class RelayTestPipe<T> extends InterThreadPipe {
 	private int numInputObjects;
 	private final ConstructorClosure<T> inputObjectCreator;
 
-	public RelayTestPipe(final int numInputObjects,
-			final ConstructorClosure<T> inputObjectCreator) {
+	public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
+		super(null, null);
 		this.numInputObjects = numInputObjects;
 		this.inputObjectCreator = inputObjectCreator;
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
index 73cba774..f40eca65 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java
@@ -7,13 +7,13 @@ public final class SingleElementPipe extends IntraThreadPipe {
 
 	private Object element;
 
-	SingleElementPipe() {
-		super();
+	<T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
 	}
 
 	@Deprecated
 	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		IPipe pipe = new SingleElementPipe();
+		IPipe pipe = new SingleElementPipe(null, null);
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
index fcbde3f4..22309e35 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java
@@ -1,5 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 
@@ -10,7 +12,20 @@ public class SingleElementPipeFactory implements IPipeFactory {
 	 */
 	@Override
 	public IPipe create(final int capacity) {
-		return new SingleElementPipe();
+		return create(null, null);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return create(sourcePort, targetPort, 1);
+	}
+
+	/**
+	 * Hint: The capacity for this pipe implementation is ignored
+	 */
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new SingleElementPipe(sourcePort, targetPort);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 4ac039e4..acb1cc62 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -16,14 +16,14 @@ public final class SpScPipe extends InterThreadPipe {
 	// statistics
 	private int numWaits;
 
-	SpScPipe(final int capacity) {
-		ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT);
-		this.queue = QueueFactory.newQueue(concurrentQueueSpec);
+	<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort);
+		this.queue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT));
 	}
 
 	@Deprecated
 	public static <T> SpScPipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		SpScPipe pipe = new SpScPipe(capacity);
+		SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity);
 		pipe.connectPorts(sourcePort, targetPort);
 		return pipe;
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
index d81ade9d..fa0686e2 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java
@@ -1,5 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 
@@ -7,7 +9,17 @@ public class SpScPipeFactory implements IPipeFactory {
 
 	@Override
 	public IPipe create(final int capacity) {
-		return new SpScPipe(capacity);
+		return create(null, null, capacity);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return create(sourcePort, targetPort, 4);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new SpScPipe(sourcePort, targetPort, capacity);
 	}
 
 	@Override
@@ -24,4 +36,5 @@ public class SpScPipeFactory implements IPipeFactory {
 	public boolean isGrowable() {
 		return false;
 	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
index fbbd079c..878844e2 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java
@@ -5,20 +5,18 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 
 public final class UnorderedGrowablePipe extends IntraThreadPipe {
 
-	private final int MIN_CAPACITY;
-
 	private Object[] elements;
 	// private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2);
 	private int lastFreeIndex;
 
-	UnorderedGrowablePipe() {
-		this.MIN_CAPACITY = 4;
-		this.elements = new Object[this.MIN_CAPACITY];
+	<T> UnorderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort);
+		this.elements = new Object[capacity];
 	}
 
 	@Deprecated
 	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		IPipe pipe = new UnorderedGrowablePipe();
+		IPipe pipe = new UnorderedGrowablePipe(null, null, 4);
 		pipe.connectPorts(sourcePort, targetPort);
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java
index c6d1b87f..536efc0c 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java
@@ -1,5 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 
@@ -10,7 +12,17 @@ public class UnorderedGrowablePipeFactory implements IPipeFactory {
 	 */
 	@Override
 	public IPipe create(final int capacity) {
-		return new UnorderedGrowablePipe();
+		return create(null, null, capacity);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return create(sourcePort, targetPort, 4);
+	}
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new UnorderedGrowablePipe(sourcePort, targetPort, capacity);
 	}
 
 	@Override
-- 
GitLab