diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs
index eea88e80e58fa2504b0cff08ef526ec3bcc0740c..1b99f1300a4fa73b7b2710e631c3f5bc42c4c508 100644
--- a/.settings/org.eclipse.jdt.ui.prefs
+++ b/.settings/org.eclipse.jdt.ui.prefs
@@ -45,11 +45,11 @@ cleanup.sort_members_all=false
 cleanup.use_blocks=true
 cleanup.use_blocks_only_for_return_and_throw=false
 cleanup.use_parentheses_in_expressions=true
-cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access=false
 cleanup.use_this_for_non_static_field_access_only_if_necessary=false
 cleanup.use_this_for_non_static_method_access=true
 cleanup.use_this_for_non_static_method_access_only_if_necessary=false
-cleanup_profile=_Kieker - Clean Up
+cleanup_profile=_TeeTime
 cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java
new file mode 100644
index 0000000000000000000000000000000000000000..ce3563c62e9ca1124a1f698262e99ffafa5221ca
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java
@@ -0,0 +1,31 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+
+public abstract class AbstractPort<T> {
+
+	protected IPipe<T> pipe;
+	/**
+	 * The type of this port.
+	 * <p>
+	 * <i>Used to validate the connection between two ports at runtime.</i>
+	 * </p>
+	 */
+	protected Class<T> type;
+
+	public IPipe<T> getPipe() {
+		return this.pipe;
+	}
+
+	public void setPipe(final IPipe<T> pipe) {
+		this.pipe = pipe;
+	}
+
+	public Class<T> getType() {
+		return this.type;
+	}
+
+	public void setType(final Class<T> type) {
+		this.type = type;
+	}
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 65e5f5b7656d6a931588eff63dd16c9a3f722fc0..f50daca15b4c6aaf565af369de88fcb7d39608bc 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -1,9 +1,12 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
@@ -118,16 +121,36 @@ public abstract class AbstractStage implements StageWithPort {
 
 	protected <T> InputPort<T> createInputPort() {
 		InputPort<T> inputPort = new InputPort<T>(this);
+		// inputPort.setType(type); // TODO set type for input port
 		this.inputPortList.add(inputPort);
 		return inputPort;
 	}
 
 	protected <T> OutputPort<T> createOutputPort() {
 		OutputPort<T> outputPort = new OutputPort<T>();
+		// outputPort.setType(type); // TODO set type for output port
 		this.outputPortList.add(outputPort);
 		return outputPort;
 	}
 
+	public List<InvalidPortConnection> validateOutputPorts() {
+		List<InvalidPortConnection> invalidOutputPortMessages = new LinkedList<InvalidPortConnection>();
+
+		for (OutputPort<?> outputPort : this.getOutputPorts()) {
+			IPipe<?> pipe = outputPort.getPipe();
+			if (null != pipe) { // if output port is connected with another one
+				Class<?> sourcePortType = outputPort.getType();
+				Class<?> targetPortType = pipe.getTargetPort().getType();
+				if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
+					InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
+					invalidOutputPortMessages.add(invalidPortConnection);
+				}
+			}
+		}
+
+		return invalidOutputPortMessages;
+	}
+
 	@Override
 	public String toString() {
 		return this.getClass().getName() + ": " + this.id;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
index 85f6ba79bee44fea528c78543e58f676fdbe171b..42b188dcf6ec3c17433a4bab28d91551400e8e5b 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
@@ -2,10 +2,9 @@ package teetime.variant.methodcallWithPorts.framework.core;
 
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
 
-public class InputPort<T> {
+public class InputPort<T> extends AbstractPort<T> {
 
 	private final StageWithPort owningStage;
-	private IPipe<T> pipe;
 
 	InputPort(final StageWithPort owningStage) {
 		super();
@@ -22,15 +21,12 @@ public class InputPort<T> {
 		return element;
 	}
 
-	public IPipe<T> getPipe() {
-		return this.pipe;
-	}
-
 	/**
 	 * Connects this input port with the given <code>pipe</code> bi-directionally
 	 * 
 	 * @param pipe
 	 */
+	@Override
 	public void setPipe(final IPipe<T> pipe) {
 		this.pipe = pipe;
 		pipe.setTargetPort(this);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java
new file mode 100644
index 0000000000000000000000000000000000000000..d753230de7d44c703f58745720ef69165d126f2d
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java
@@ -0,0 +1,22 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+public class InvalidPortConnection {
+
+	private final OutputPort<?> sourcePort;
+	private final InputPort<?> inputPort;
+
+	public InvalidPortConnection(final OutputPort<?> sourcePort, final InputPort<?> inputPort) {
+		super();
+		this.sourcePort = sourcePort;
+		this.inputPort = inputPort;
+	}
+
+	public OutputPort<?> getSourcePort() {
+		return sourcePort;
+	}
+
+	public InputPort<?> getInputPort() {
+		return inputPort;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index 9fff9078abfe43830445899ceaf502218ea47e4c..0aeab19c89973fec066303a2637875929c7fa769 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -1,10 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
-import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+public class OutputPort<T> extends AbstractPort<T> {
 
-public class OutputPort<T> {
-
-	private IPipe<T> pipe;
 	/**
 	 * Performance cache: Avoids the following method chain
 	 * 
@@ -27,14 +24,6 @@ public class OutputPort<T> {
 		return this.pipe.add(element);
 	}
 
-	public IPipe<T> getPipe() {
-		return this.pipe;
-	}
-
-	public void setPipe(final IPipe<T> pipe) {
-		this.pipe = pipe;
-	}
-
 	public StageWithPort getCachedTargetStage() {
 		return this.cachedTargetStage;
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
index 5b0c55060613704c740dc67a1b8c4a01756214f6..f20f891e025f205d6a46e8f93654088d98ae6058 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
@@ -8,8 +8,15 @@ import java.util.UUID;
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
+/**
+ * 
+ * @author Christian Wulf
+ * 
+ * @param <FirstStage>
+ * @param <LastStage>
+ */
 // BETTER remove the pipeline since it does not add any new functionality
-public class Pipeline<I, O> implements StageWithPort {
+public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort {
 
 	private final String id;
 	/**
@@ -17,11 +24,9 @@ public class Pipeline<I, O> implements StageWithPort {
 	 */
 	protected Log logger;
 
-	private StageWithPort firstStage;
-	private InputPort<I> firstStageInputPort;
+	private FirstStage firstStage;
 	private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
-	private StageWithPort lastStage;
-	private OutputPort<O> lastStageOutputPort;
+	private LastStage lastStage;
 
 	// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
 	// multiple input ports?
@@ -47,9 +52,8 @@ public class Pipeline<I, O> implements StageWithPort {
 		return this.id;
 	}
 
-	public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) {
+	public void setFirstStage(final FirstStage stage) {
 		this.firstStage = stage;
-		this.firstStageInputPort = firstStageInputPort;
 	}
 
 	public void addIntermediateStages(final StageWithPort... stages) {
@@ -60,9 +64,8 @@ public class Pipeline<I, O> implements StageWithPort {
 		this.intermediateStages.add(stage);
 	}
 
-	public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) {
+	public void setLastStage(final LastStage stage) {
 		this.lastStage = stage;
-		this.lastStageOutputPort = lastStageOutputPort;
 	}
 
 	@Override
@@ -151,17 +154,17 @@ public class Pipeline<I, O> implements StageWithPort {
 	// this.reschedulable = reschedulable;
 	// }
 
-	public InputPort<I> getInputPort() {
-		return this.firstStageInputPort;
+	@Override
+	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+		this.firstStage.onSignal(signal, inputPort);
 	}
 
-	public OutputPort<O> getOutputPort() {
-		return this.lastStageOutputPort;
+	public FirstStage getFirstStage() {
+		return this.firstStage;
 	}
 
-	@Override
-	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
-		this.firstStage.onSignal(signal, inputPort);
+	public LastStage getLastStage() {
+		return this.lastStage;
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
index 47e14bdb8b56ade04d7bc288d31cd4b9201fbd62..6c5e60577a4ffa8d8b1cd8392b0516dae1b511d0 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core;
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
-public class RunnableStage<I> implements Runnable {
+public class RunnableStage implements Runnable {
 
-	private final ConsumerStage<I> stage;
+	private final StageWithPort stage;
 	private final Log logger;
 
-	public RunnableStage(final ConsumerStage<I> stage) {
+	public RunnableStage(final StageWithPort stage) {
 		this.stage = stage;
 		this.logger = LogFactory.getLog(stage.getClass());
 	}
@@ -24,7 +24,7 @@ public class RunnableStage<I> implements Runnable {
 				this.stage.executeWithPorts();
 			} while (this.stage.isReschedulable());
 
-			this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort());
+			this.stage.onSignal(Signal.FINISHED, null);
 
 		} catch (RuntimeException e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
index c61cd75457fda7bac419cea523cceac5c074425c..749548bba8f697f013b88b2b80774347bd472437 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
@@ -17,6 +17,8 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord;
 
 import java.io.File;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter;
@@ -30,16 +32,24 @@ import kieker.common.record.IMonitoringRecord;
  * 
  * @since 1.10
  */
-public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> {
+public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> {
 
 	public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
-		final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
-		final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
+		File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
+		TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
 
-		this.setFirstStage(file2TextLinesFilter, file2TextLinesFilter.getInputPort());
-		this.setLastStage(textLine2RecordFilter, textLine2RecordFilter.getOutputPort());
+		this.setFirstStage(file2TextLinesFilter);
+		this.setLastStage(textLine2RecordFilter);
 
 		// BETTER let the framework choose the optimal pipe implementation
 		SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
 	}
+
+	public InputPort<File> getInputPort() {
+		return this.getFirstStage().getInputPort();
+	}
+
+	public OutputPort<IMonitoringRecord> getOutputPort() {
+		return this.getLastStage().getOutputPort();
+	}
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index 6c1df9355dfcd418c4e658d53b8a19618603d854..85fa98ffac141865daea166bbe2e5833289aa3db 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -20,10 +20,10 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.Counter;
 import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
-import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.Sink;
 import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
@@ -55,38 +55,38 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
-		this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
+		Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
-		this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
+		Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000);
-		this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
+		Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
+		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
-			this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
+			StageWithPort pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
+			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
+	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
+	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -95,13 +95,13 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
+		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private static class StageFactory<T extends StageWithPort<?, ?>> {
+	private static class StageFactory<T extends StageWithPort> {
 
 		private final Constructor<T> constructor;
 		private final List<T> stages = new ArrayList<T>();
@@ -155,9 +155,9 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		}
 	}
 
-	private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
-			final StageWithPort<Void, Long> clockStage,
-			final StageWithPort<Void, Long> clock2Stage) {
+	private Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
+			final Pipeline<Clock, Distributor<Long>> clockStage,
+			final Pipeline<Clock, Distributor<Long>> clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
@@ -169,10 +169,10 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
 		Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
 		ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
-		EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
+		Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
 
 		// connect stages
-		this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
+		this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
 
 		SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
 		SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
@@ -186,11 +186,11 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
 		// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
 
-		SpScPipe.connect(clock2Stage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
-		SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
+		SpScPipe.connect(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
+		SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
+		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(recordCounter);
 		pipeline.addIntermediateStage(traceMetadataCounter);