From 468be6de13a8ef9ab695ecd0693b8ec52dd0b60e Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 30 Jul 2014 04:54:50 +0200
Subject: [PATCH] added type to ports to allow type validation at runtime

---
 .settings/org.eclipse.jdt.ui.prefs            |  4 +-
 .../framework/core/AbstractPort.java          | 31 +++++++++++++
 .../framework/core/AbstractStage.java         | 23 ++++++++++
 .../framework/core/InputPort.java             |  8 +---
 .../framework/core/InvalidPortConnection.java | 22 ++++++++++
 .../framework/core/OutputPort.java            | 13 +-----
 .../framework/core/Pipeline.java              | 35 ++++++++-------
 .../framework/core/RunnableStage.java         |  8 ++--
 .../fileToRecord/DatFile2RecordFilter.java    | 20 ++++++---
 .../TcpTraceReductionAnalysisWithThreads.java | 44 +++++++++----------
 10 files changed, 141 insertions(+), 67 deletions(-)
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java
 create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java

diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs
index eea88e80..1b99f130 100644
--- a/.settings/org.eclipse.jdt.ui.prefs
+++ b/.settings/org.eclipse.jdt.ui.prefs
@@ -45,11 +45,11 @@ cleanup.sort_members_all=false
 cleanup.use_blocks=true
 cleanup.use_blocks_only_for_return_and_throw=false
 cleanup.use_parentheses_in_expressions=true
-cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access=false
 cleanup.use_this_for_non_static_field_access_only_if_necessary=false
 cleanup.use_this_for_non_static_method_access=true
 cleanup.use_this_for_non_static_method_access_only_if_necessary=false
-cleanup_profile=_Kieker - Clean Up
+cleanup_profile=_TeeTime
 cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java
new file mode 100644
index 00000000..ce3563c6
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java
@@ -0,0 +1,31 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+
+public abstract class AbstractPort<T> {
+
+	protected IPipe<T> pipe;
+	/**
+	 * The type of this port.
+	 * <p>
+	 * <i>Used to validate the connection between two ports at runtime.</i>
+	 * </p>
+	 */
+	protected Class<T> type;
+
+	public IPipe<T> getPipe() {
+		return this.pipe;
+	}
+
+	public void setPipe(final IPipe<T> pipe) {
+		this.pipe = pipe;
+	}
+
+	public Class<T> getType() {
+		return this.type;
+	}
+
+	public void setType(final Class<T> type) {
+		this.type = type;
+	}
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 65e5f5b7..f50daca1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -1,9 +1,12 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
@@ -118,16 +121,36 @@ public abstract class AbstractStage implements StageWithPort {
 
 	protected <T> InputPort<T> createInputPort() {
 		InputPort<T> inputPort = new InputPort<T>(this);
+		// inputPort.setType(type); // TODO set type for input port
 		this.inputPortList.add(inputPort);
 		return inputPort;
 	}
 
 	protected <T> OutputPort<T> createOutputPort() {
 		OutputPort<T> outputPort = new OutputPort<T>();
+		// outputPort.setType(type); // TODO set type for output port
 		this.outputPortList.add(outputPort);
 		return outputPort;
 	}
 
+	public List<InvalidPortConnection> validateOutputPorts() {
+		List<InvalidPortConnection> invalidOutputPortMessages = new LinkedList<InvalidPortConnection>();
+
+		for (OutputPort<?> outputPort : this.getOutputPorts()) {
+			IPipe<?> pipe = outputPort.getPipe();
+			if (null != pipe) { // if output port is connected with another one
+				Class<?> sourcePortType = outputPort.getType();
+				Class<?> targetPortType = pipe.getTargetPort().getType();
+				if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
+					InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
+					invalidOutputPortMessages.add(invalidPortConnection);
+				}
+			}
+		}
+
+		return invalidOutputPortMessages;
+	}
+
 	@Override
 	public String toString() {
 		return this.getClass().getName() + ": " + this.id;
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
index 85f6ba79..42b188dc 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java
@@ -2,10 +2,9 @@ package teetime.variant.methodcallWithPorts.framework.core;
 
 import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
 
-public class InputPort<T> {
+public class InputPort<T> extends AbstractPort<T> {
 
 	private final StageWithPort owningStage;
-	private IPipe<T> pipe;
 
 	InputPort(final StageWithPort owningStage) {
 		super();
@@ -22,15 +21,12 @@ public class InputPort<T> {
 		return element;
 	}
 
-	public IPipe<T> getPipe() {
-		return this.pipe;
-	}
-
 	/**
 	 * Connects this input port with the given <code>pipe</code> bi-directionally
 	 * 
 	 * @param pipe
 	 */
+	@Override
 	public void setPipe(final IPipe<T> pipe) {
 		this.pipe = pipe;
 		pipe.setTargetPort(this);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java
new file mode 100644
index 00000000..d753230d
--- /dev/null
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InvalidPortConnection.java
@@ -0,0 +1,22 @@
+package teetime.variant.methodcallWithPorts.framework.core;
+
+public class InvalidPortConnection {
+
+	private final OutputPort<?> sourcePort;
+	private final InputPort<?> inputPort;
+
+	public InvalidPortConnection(final OutputPort<?> sourcePort, final InputPort<?> inputPort) {
+		super();
+		this.sourcePort = sourcePort;
+		this.inputPort = inputPort;
+	}
+
+	public OutputPort<?> getSourcePort() {
+		return sourcePort;
+	}
+
+	public InputPort<?> getInputPort() {
+		return inputPort;
+	}
+
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index 9fff9078..0aeab19c 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -1,10 +1,7 @@
 package teetime.variant.methodcallWithPorts.framework.core;
 
-import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+public class OutputPort<T> extends AbstractPort<T> {
 
-public class OutputPort<T> {
-
-	private IPipe<T> pipe;
 	/**
 	 * Performance cache: Avoids the following method chain
 	 * 
@@ -27,14 +24,6 @@ public class OutputPort<T> {
 		return this.pipe.add(element);
 	}
 
-	public IPipe<T> getPipe() {
-		return this.pipe;
-	}
-
-	public void setPipe(final IPipe<T> pipe) {
-		this.pipe = pipe;
-	}
-
 	public StageWithPort getCachedTargetStage() {
 		return this.cachedTargetStage;
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
index 5b0c5506..f20f891e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java
@@ -8,8 +8,15 @@ import java.util.UUID;
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
+/**
+ * 
+ * @author Christian Wulf
+ * 
+ * @param <FirstStage>
+ * @param <LastStage>
+ */
 // BETTER remove the pipeline since it does not add any new functionality
-public class Pipeline<I, O> implements StageWithPort {
+public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort {
 
 	private final String id;
 	/**
@@ -17,11 +24,9 @@ public class Pipeline<I, O> implements StageWithPort {
 	 */
 	protected Log logger;
 
-	private StageWithPort firstStage;
-	private InputPort<I> firstStageInputPort;
+	private FirstStage firstStage;
 	private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
-	private StageWithPort lastStage;
-	private OutputPort<O> lastStageOutputPort;
+	private LastStage lastStage;
 
 	// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
 	// multiple input ports?
@@ -47,9 +52,8 @@ public class Pipeline<I, O> implements StageWithPort {
 		return this.id;
 	}
 
-	public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) {
+	public void setFirstStage(final FirstStage stage) {
 		this.firstStage = stage;
-		this.firstStageInputPort = firstStageInputPort;
 	}
 
 	public void addIntermediateStages(final StageWithPort... stages) {
@@ -60,9 +64,8 @@ public class Pipeline<I, O> implements StageWithPort {
 		this.intermediateStages.add(stage);
 	}
 
-	public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) {
+	public void setLastStage(final LastStage stage) {
 		this.lastStage = stage;
-		this.lastStageOutputPort = lastStageOutputPort;
 	}
 
 	@Override
@@ -151,17 +154,17 @@ public class Pipeline<I, O> implements StageWithPort {
 	// this.reschedulable = reschedulable;
 	// }
 
-	public InputPort<I> getInputPort() {
-		return this.firstStageInputPort;
+	@Override
+	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+		this.firstStage.onSignal(signal, inputPort);
 	}
 
-	public OutputPort<O> getOutputPort() {
-		return this.lastStageOutputPort;
+	public FirstStage getFirstStage() {
+		return this.firstStage;
 	}
 
-	@Override
-	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
-		this.firstStage.onSignal(signal, inputPort);
+	public LastStage getLastStage() {
+		return this.lastStage;
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
index 47e14bdb..6c5e6057 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java
@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core;
 import kieker.common.logging.Log;
 import kieker.common.logging.LogFactory;
 
-public class RunnableStage<I> implements Runnable {
+public class RunnableStage implements Runnable {
 
-	private final ConsumerStage<I> stage;
+	private final StageWithPort stage;
 	private final Log logger;
 
-	public RunnableStage(final ConsumerStage<I> stage) {
+	public RunnableStage(final StageWithPort stage) {
 		this.stage = stage;
 		this.logger = LogFactory.getLog(stage.getClass());
 	}
@@ -24,7 +24,7 @@ public class RunnableStage<I> implements Runnable {
 				this.stage.executeWithPorts();
 			} while (this.stage.isReschedulable());
 
-			this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort());
+			this.stage.onSignal(Signal.FINISHED, null);
 
 		} catch (RuntimeException e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
index c61cd754..749548bb 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
@@ -17,6 +17,8 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord;
 
 import java.io.File;
 
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter;
@@ -30,16 +32,24 @@ import kieker.common.record.IMonitoringRecord;
  * 
  * @since 1.10
  */
-public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> {
+public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> {
 
 	public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
-		final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
-		final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
+		File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
+		TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
 
-		this.setFirstStage(file2TextLinesFilter, file2TextLinesFilter.getInputPort());
-		this.setLastStage(textLine2RecordFilter, textLine2RecordFilter.getOutputPort());
+		this.setFirstStage(file2TextLinesFilter);
+		this.setLastStage(textLine2RecordFilter);
 
 		// BETTER let the framework choose the optimal pipe implementation
 		SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
 	}
+
+	public InputPort<File> getInputPort() {
+		return this.getFirstStage().getInputPort();
+	}
+
+	public OutputPort<IMonitoringRecord> getOutputPort() {
+		return this.getLastStage().getOutputPort();
+	}
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index 6c1df935..85fa98ff 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -20,10 +20,10 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.Counter;
 import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
-import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.Sink;
 import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
@@ -55,38 +55,38 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 	@Override
 	public void init() {
 		super.init();
-		StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
-		this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
+		Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
-		this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
+		Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		this.clockThread = new Thread(new RunnableStage(clockStage));
 
-		StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000);
-		this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
+		Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
+		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		this.workerThreads = new Thread[this.numWorkerThreads];
 
 		for (int i = 0; i < this.workerThreads.length; i++) {
-			StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
-			this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
+			StageWithPort pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
+			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
 		}
 	}
 
-	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
+	private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TCPReader tcpReader = new TCPReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
 		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+		Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
 		pipeline.setFirstStage(tcpReader);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
+	private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
@@ -95,13 +95,13 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
 
 		// create and configure pipeline
-		Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
+		Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
 		pipeline.setFirstStage(clock);
 		pipeline.setLastStage(distributor);
 		return pipeline;
 	}
 
-	private static class StageFactory<T extends StageWithPort<?, ?>> {
+	private static class StageFactory<T extends StageWithPort> {
 
 		private final Constructor<T> constructor;
 		private final List<T> stages = new ArrayList<T>();
@@ -155,9 +155,9 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		}
 	}
 
-	private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
-			final StageWithPort<Void, Long> clockStage,
-			final StageWithPort<Void, Long> clock2Stage) {
+	private Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
+			final Pipeline<Clock, Distributor<Long>> clockStage,
+			final Pipeline<Clock, Distributor<Long>> clock2Stage) {
 		// create stages
 		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
 		Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
@@ -169,10 +169,10 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
 		Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
 		ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
-		EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
+		Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
 
 		// connect stages
-		this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
+		this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
 
 		SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
 		SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
@@ -186,11 +186,11 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
 		// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
 
-		SpScPipe.connect(clock2Stage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
-		SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
+		SpScPipe.connect(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
+		SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
 
 		// create and configure pipeline
-		Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
+		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
 		pipeline.setFirstStage(relay);
 		pipeline.addIntermediateStage(recordCounter);
 		pipeline.addIntermediateStage(traceMetadataCounter);
-- 
GitLab