From d53f3e5bab97e15e228a30972ca2fa4fe515e381 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 31 Jul 2015 11:05:51 +0200
Subject: [PATCH] introduced type parameter to pipe

---
 .../framework/AbstractCompositeStage.java     |  2 +-
 .../framework/AbstractInterThreadPipe.java    |  4 ++--
 .../framework/AbstractIntraThreadPipe.java    |  4 ++--
 .../java/teetime/framework/AbstractPipe.java  | 12 ++++++------
 .../java/teetime/framework/Execution.java     |  3 ++-
 .../framework/ExecutionInstantiation.java     | 19 +++++++++----------
 .../java/teetime/framework/IPipeVisitor.java  |  2 +-
 .../framework/IntraStageCollector.java        |  2 +-
 .../teetime/framework/pipe/DummyPipe.java     |  6 +++---
 .../java/teetime/framework/pipe/IPipe.java    |  6 +++---
 .../teetime/framework/pipe/IPipeFactory.java  |  4 ++--
 .../framework/pipe/InstantiationPipe.java     | 13 +++++++------
 .../teetime/framework/pipe/RelayTestPipe.java |  2 +-
 .../framework/pipe/SingleElementPipe.java     |  4 ++--
 .../pipe/SingleElementPipeFactory.java        |  6 +++---
 .../java/teetime/framework/pipe/SpScPipe.java |  4 ++--
 .../framework/pipe/SpScPipeFactory.java       |  6 +++---
 .../framework/pipe/UnboundedSpScPipe.java     |  4 ++--
 .../pipe/UnboundedSpScPipeFactory.java        |  6 +++---
 src/test/resources/data/output.txt            | 11 -----------
 20 files changed, 55 insertions(+), 65 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java
index 0c343df1..2a1efcf0 100644
--- a/src/main/java/teetime/framework/AbstractCompositeStage.java
+++ b/src/main/java/teetime/framework/AbstractCompositeStage.java
@@ -112,7 +112,7 @@ public abstract class AbstractCompositeStage {
 		// addChildContext(sourcePort.getOwningStage());
 		// addChildContext(targetPort.getOwningStage());
 
-		new InstantiationPipe(sourcePort, targetPort, capacity);
+		new InstantiationPipe<T>(sourcePort, targetPort, capacity);
 	}
 
 }
diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index 8786ef85..b3f25a75 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -32,13 +32,13 @@ import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy;
 import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy;
 import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy;
 
-public abstract class AbstractInterThreadPipe extends AbstractPipe {
+public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> {
 
 	private final BlockingQueue<ISignal> signalQueue;
 
 	private volatile boolean closed;
 
-	protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+	protected AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		super(sourcePort, targetPort, capacity);
 		final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
 		final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
index a12acf71..2be7f3e3 100644
--- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
@@ -17,11 +17,11 @@ package teetime.framework;
 
 import teetime.framework.signal.ISignal;
 
-public abstract class AbstractIntraThreadPipe extends AbstractPipe {
+public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> {
 
 	private boolean closed;
 
-	protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+	protected AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		super(sourcePort, targetPort, capacity);
 	}
 
diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java
index 2a6ea3bb..31b3fbd6 100644
--- a/src/main/java/teetime/framework/AbstractPipe.java
+++ b/src/main/java/teetime/framework/AbstractPipe.java
@@ -17,7 +17,7 @@ package teetime.framework;
 
 import teetime.framework.pipe.IPipe;
 
-public abstract class AbstractPipe implements IPipe {
+public abstract class AbstractPipe<T> implements IPipe<T> {
 
 	/**
 	 * Performance cache: Avoids the following method chain
@@ -28,12 +28,12 @@ public abstract class AbstractPipe implements IPipe {
 	 */
 	protected final Stage cachedTargetStage;
 
-	private final OutputPort<?> sourcePort;
-	private final InputPort<?> targetPort;
+	private final OutputPort<? extends T> sourcePort;
+	private final InputPort<T> targetPort;
 	@SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName")
 	private final int capacity;
 
-	protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+	protected AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		if (sourcePort == null) {
 			throw new IllegalArgumentException("sourcePort may not be null");
 		}
@@ -51,12 +51,12 @@ public abstract class AbstractPipe implements IPipe {
 	}
 
 	@Override
-	public final OutputPort<?> getSourcePort() {
+	public final OutputPort<? extends T> getSourcePort() {
 		return sourcePort;
 	}
 
 	@Override
-	public final InputPort<?> getTargetPort() {
+	public final InputPort<T> getTargetPort() {
 		return targetPort;
 	}
 
diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java
index 88dbb2df..3e716268 100644
--- a/src/main/java/teetime/framework/Execution.java
+++ b/src/main/java/teetime/framework/Execution.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import teetime.framework.Traversor.Direction;
 import teetime.framework.signal.ValidatingSignal;
 import teetime.framework.validation.AnalysisNotValidException;
 
@@ -103,7 +104,7 @@ public final class Execution<T extends Configuration> {
 		executionInstantiation.instantiatePipes();
 
 		IPipeVisitor pipeVisitor = new StageCollector();
-		Traversor traversor = new Traversor(pipeVisitor);
+		Traversor traversor = new Traversor(pipeVisitor, Direction.BOTH);
 		// TODO iterate through each producer
 		// traversor.traverse(stage);
 
diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java
index abcf9599..4334cf42 100644
--- a/src/main/java/teetime/framework/ExecutionInstantiation.java
+++ b/src/main/java/teetime/framework/ExecutionInstantiation.java
@@ -34,19 +34,19 @@ class ExecutionInstantiation {
 
 	private final ConfigurationContext context;
 
-	public ExecutionInstantiation(final ConfigurationContext configuration) {
-		this.context = configuration;
+	public ExecutionInstantiation(final ConfigurationContext context) {
+		this.context = context;
 	}
 
 	void instantiatePipes() {
 		int color = DEFAULT_COLOR;
 		Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
-		Set<Stage> threadableStageJobs = context.getThreadableStages().keySet();
-		for (Stage threadableStage : threadableStageJobs) {
+		Set<Stage> threadableStages = context.getThreadableStages().keySet();
+		for (Stage threadableStage : threadableStages) {
 			color++;
 			colors.put(threadableStage, color);
 
-			ThreadPainter threadPainter = new ThreadPainter(colors, color, context);
+			ThreadPainter threadPainter = new ThreadPainter(colors, color, threadableStages);
 			threadPainter.colorAndConnectStages(threadableStage);
 		}
 	}
@@ -55,13 +55,13 @@ class ExecutionInstantiation {
 
 		private final Map<Stage, Integer> colors;
 		private final int color;
-		private final ConfigurationContext context;
+		private final Set<Stage> threadableStages;
 
-		public ThreadPainter(final Map<Stage, Integer> colors, final int color, final ConfigurationContext context) {
+		public ThreadPainter(final Map<Stage, Integer> colors, final int color, final Set<Stage> threadableStages) {
 			super();
 			this.colors = colors;
 			this.color = color;
-			this.context = context;
+			this.threadableStages = threadableStages;
 		}
 
 		public int colorAndConnectStages(final Stage stage) {
@@ -69,7 +69,7 @@ class ExecutionInstantiation {
 
 			for (OutputPort<?> outputPort : stage.getOutputPorts()) {
 				if (outputPort.pipe != null && outputPort.pipe instanceof InstantiationPipe) {
-					InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe;
+					InstantiationPipe<?> pipe = (InstantiationPipe<?>) outputPort.pipe;
 					createdConnections += processPipe(outputPort, pipe);
 					createdConnections++;
 				}
@@ -80,7 +80,6 @@ class ExecutionInstantiation {
 
 		@SuppressWarnings({ "rawtypes", "unchecked" })
 		private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) {
-			Set<Stage> threadableStages = context.getThreadableStages().keySet();
 			int numCreatedConnections;
 
 			Stage targetStage = pipe.getTargetPort().getOwningStage();
diff --git a/src/main/java/teetime/framework/IPipeVisitor.java b/src/main/java/teetime/framework/IPipeVisitor.java
index bc4da464..9b884a7a 100644
--- a/src/main/java/teetime/framework/IPipeVisitor.java
+++ b/src/main/java/teetime/framework/IPipeVisitor.java
@@ -23,6 +23,6 @@ public interface IPipeVisitor {
 		CONTINUE, STOP
 	}
 
-	VisitorBehavior visit(IPipe outputPipe);
+	VisitorBehavior visit(IPipe<?> pipe);
 
 }
diff --git a/src/main/java/teetime/framework/IntraStageCollector.java b/src/main/java/teetime/framework/IntraStageCollector.java
index 3bc01fa6..cd0fbede 100644
--- a/src/main/java/teetime/framework/IntraStageCollector.java
+++ b/src/main/java/teetime/framework/IntraStageCollector.java
@@ -22,7 +22,7 @@ public class IntraStageCollector implements IPipeVisitor {
 	public IntraStageCollector() {}
 
 	@Override
-	public VisitorBehavior visit(final IPipe outputPipe) {
+	public VisitorBehavior visit(final IPipe<?> outputPipe) {
 		if (outputPipe instanceof AbstractIntraThreadPipe) {
 			return VisitorBehavior.CONTINUE;
 		}
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index 8381869e..24809109 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -25,7 +25,7 @@ import teetime.framework.signal.ISignal;
  * @author Christian Wulf
  *
  */
-public final class DummyPipe implements IPipe {
+public final class DummyPipe<T> implements IPipe<T> {
 
 	@Override
 	public boolean add(final Object element) {
@@ -53,12 +53,12 @@ public final class DummyPipe implements IPipe {
 	}
 
 	@Override
-	public OutputPort<?> getSourcePort() {
+	public OutputPort<? extends T> getSourcePort() {
 		return null;
 	}
 
 	@Override
-	public InputPort<Object> getTargetPort() {
+	public InputPort<T> getTargetPort() {
 		return null;
 	}
 
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index de1cd908..7610bd7d 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -22,7 +22,7 @@ import teetime.framework.signal.ISignal;
 /**
  * Represents a pipe that connects an output port with an input port.
  */
-public interface IPipe {
+public interface IPipe<T> {
 
 	/**
 	 * Adds an element to the Pipe.
@@ -69,12 +69,12 @@ public interface IPipe {
 	/**
 	 * @return the output port that is connected to the pipe.
 	 */
-	OutputPort<?> getSourcePort();
+	OutputPort<? extends T> getSourcePort();
 
 	/**
 	 * @return the input port that is connected to the pipe.
 	 */
-	InputPort<?> getTargetPort();
+	InputPort<T> getTargetPort();
 
 	/**
 	 * A stage can pass on a signal by executing this method. The signal will be sent to the receiving stage.
diff --git a/src/main/java/teetime/framework/pipe/IPipeFactory.java b/src/main/java/teetime/framework/pipe/IPipeFactory.java
index 23b8fe0c..cd18675d 100644
--- a/src/main/java/teetime/framework/pipe/IPipeFactory.java
+++ b/src/main/java/teetime/framework/pipe/IPipeFactory.java
@@ -35,7 +35,7 @@ public interface IPipeFactory {
 	 *
 	 * @return The connecting pipe.
 	 */
-	<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
+	<T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
 
 	/**
 	 * Connects two stages with a pipe.
@@ -50,7 +50,7 @@ public interface IPipeFactory {
 	 *            type of elements which traverse this pipe
 	 * @return The connecting pipe.
 	 */
-	<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
+	<T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
 
 	/**
 	 * @return Whether or not the created pipes are growable
diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
index ab02f62d..b0cae7f6 100644
--- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java
+++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
@@ -19,15 +19,16 @@ import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.signal.ISignal;
 
-public class InstantiationPipe implements IPipe {
+public class InstantiationPipe<T> implements IPipe<T> {
 
 	private static final String ERROR_MESSAGE = "This must not be called while executing the configuration";
 
-	private final OutputPort<?> sourcePort;
-	private final InputPort<?> targetPort;
+	private final OutputPort<? extends T> sourcePort;
+	private final InputPort<T> targetPort;
+	@SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName")
 	private final int capacity;
 
-	public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+	public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		this.sourcePort = sourcePort;
 		this.targetPort = targetPort;
 		this.capacity = capacity;
@@ -41,12 +42,12 @@ public class InstantiationPipe implements IPipe {
 	}
 
 	@Override
-	public OutputPort<?> getSourcePort() {
+	public OutputPort<? extends T> getSourcePort() {
 		return sourcePort;
 	}
 
 	@Override
-	public InputPort<?> getTargetPort() {
+	public InputPort<T> getTargetPort() {
 		return targetPort;
 	}
 
diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
index 20e99adb..34f1d57d 100644
--- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
@@ -18,7 +18,7 @@ package teetime.framework.pipe;
 import teetime.framework.AbstractInterThreadPipe;
 import teetime.util.ConstructorClosure;
 
-final class RelayTestPipe<T> extends AbstractInterThreadPipe {
+final class RelayTestPipe<T> extends AbstractInterThreadPipe<T> {
 
 	private int numInputObjects;
 	private final ConstructorClosure<T> inputObjectCreator;
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
index 655b9b5f..470c8330 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
@@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 
-final class SingleElementPipe extends AbstractIntraThreadPipe {
+final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> {
 
 	private Object element;
 
-	<T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort, 1);
 	}
 
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java
index 7e4c53ce..62de0583 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java
@@ -21,7 +21,7 @@ import teetime.framework.OutputPort;
 public final class SingleElementPipeFactory implements IPipeFactory {
 
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		return this.create(sourcePort, targetPort, 1);
 	}
 
@@ -31,8 +31,8 @@ public final class SingleElementPipeFactory implements IPipeFactory {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		return new SingleElementPipe(sourcePort, targetPort);
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new SingleElementPipe<T>(sourcePort, targetPort);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 2937fb6d..73696bc8 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -22,7 +22,7 @@ import teetime.framework.StageState;
 import teetime.framework.exceptionHandling.TerminateException;
 import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
 
-final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe {
+final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorablePipe {
 
 	// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
 
@@ -30,7 +30,7 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
 	// statistics
 	private int numWaits;
 
-	<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+	SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		super(sourcePort, targetPort, capacity);
 		this.queue = new ObservableSpScArrayQueue<Object>(capacity);
 	}
diff --git a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java
index 28350f43..ff0e3e8f 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java
@@ -21,13 +21,13 @@ import teetime.framework.OutputPort;
 public final class SpScPipeFactory implements IPipeFactory {
 
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		return this.create(sourcePort, targetPort, 4);
 	}
 
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		return new SpScPipe(sourcePort, targetPort, capacity);
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new SpScPipe<T>(sourcePort, targetPort, capacity);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
index 954c5918..19dfb387 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 
-final class UnboundedSpScPipe extends AbstractInterThreadPipe {
+final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> {
 
 	private final Queue<Object> queue;
 
-	<T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		super(sourcePort, targetPort, Integer.MAX_VALUE);
 		ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
 		this.queue = QueueFactory.newQueue(specification);
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java
index 1b3b2f9d..442e46f7 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java
@@ -21,7 +21,7 @@ import teetime.framework.OutputPort;
 public class UnboundedSpScPipeFactory implements IPipeFactory {
 
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		return this.create(sourcePort, targetPort, 0);
 	}
 
@@ -31,8 +31,8 @@ public class UnboundedSpScPipeFactory implements IPipeFactory {
 	 * The capacity is ignored.
 	 */
 	@Override
-	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		return new UnboundedSpScPipe(sourcePort, targetPort);
+	public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new UnboundedSpScPipe<T>(sourcePort, targetPort);
 	}
 
 	@Override
diff --git a/src/test/resources/data/output.txt b/src/test/resources/data/output.txt
index 5c016264..e69de29b 100644
--- a/src/test/resources/data/output.txt
+++ b/src/test/resources/data/output.txt
@@ -1,11 +0,0 @@
-Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
-Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
-Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
-Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
-Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
-At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
-Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus.
-Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
-Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
-Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
-Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo
\ No newline at end of file
-- 
GitLab