From bd803594d13895c497b68f87cf0f0e86322a793a Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 30 Jul 2014 06:56:32 +0200
Subject: [PATCH] fixed type-safety bug in Merger; added additional output port
 to TraceReconstructionFilter

---
 .../java/teetime/util/StatisticsUtil.java     | 12 ++++++
 .../stage/basic/merger/Merger.java            |  4 +-
 .../TraceReconstructionFilter.java            | 43 ++++++++++++-------
 .../kiekerdays/TcpTraceReconstruction.java    |  2 +-
 .../kiekerdays/TcpTraceReduction.java         |  2 +-
 ...hwHomeTraceReconstructionAnalysisTest.java | 19 +++++---
 ...hwWorkTraceReconstructionAnalysisTest.java | 20 ++-------
 .../TcpTraceReconstructionAnalysis.java       |  2 +-
 .../TraceReconstructionAnalysis.java          |  7 ++-
 ...raceReconstructionAnalysisWithThreads.java |  2 +-
 .../TcpTraceReductionAnalysisWithThreads.java |  2 +-
 11 files changed, 68 insertions(+), 47 deletions(-)

diff --git a/src/main/java/teetime/util/StatisticsUtil.java b/src/main/java/teetime/util/StatisticsUtil.java
index c27035ae..a5120bba 100644
--- a/src/main/java/teetime/util/StatisticsUtil.java
+++ b/src/main/java/teetime/util/StatisticsUtil.java
@@ -17,6 +17,7 @@ package teetime.util;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -111,4 +112,15 @@ public class StatisticsUtil {
 		return quintileValues;
 	}
 
+	public static void removeFirstZeroThroughputs(final List<Long> throughputs) {
+		Iterator<Long> iterator = throughputs.iterator();
+		while (iterator.hasNext()) {
+			if (iterator.next() == 0) {
+				iterator.remove();
+			} else {
+				break;
+			}
+		}
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
index 3d70d4b9..c28d0bcf 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
@@ -21,8 +21,6 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 import teetime.variant.methodcallWithPorts.framework.core.Signal;
 
-import kieker.common.record.IMonitoringRecord;
-
 /**
  * 
  * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
@@ -97,7 +95,7 @@ public class Merger<T> extends AbstractStage {
 		return super.getInputPorts();
 	}
 
-	public InputPort<IMonitoringRecord> getNewInputPort() {
+	public InputPort<T> getNewInputPort() {
 		return this.createInputPort();
 	}
 
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
index 5d525495..4693eea8 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
@@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata;
  */
 public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
 
-	private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort();
+	private final OutputPort<TraceEventRecords> traceValidOutputPort = this.createOutputPort();
+	private final OutputPort<TraceEventRecords> traceInvalidOutputPort = this.createOutputPort(); // TODO send output to this port
 
 	private TimeUnit timeunit;
 	private long maxTraceDuration = Long.MAX_VALUE;
@@ -52,16 +53,25 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
 	protected void execute(final IFlowRecord element) {
 		final Long traceId = this.reconstructTrace(element);
 		if (traceId != null) {
-			this.putIfFinished(traceId);
+			this.put(traceId, true);
 		}
 	}
 
-	private void putIfFinished(final Long traceId) {
+	private void put(final Long traceId, final boolean onlyIfFinished) {
 		final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
-		if (traceBuffer != null && traceBuffer.isFinished()) { // null-check to check whether the trace has already been sent and removed
-			boolean removed = (null != this.traceId2trace.remove(traceId));
-			if (removed) {
-				this.put(traceBuffer);
+		if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed
+			boolean shouldSend;
+			if (onlyIfFinished) {
+				shouldSend = traceBuffer.isFinished();
+			} else {
+				shouldSend = true;
+			}
+
+			if (shouldSend) {
+				boolean removed = (null != this.traceId2trace.remove(traceId));
+				if (removed) {
+					this.sendTraceBuffer(traceBuffer);
+				}
 			}
 		}
 	}
@@ -86,17 +96,16 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
 	@Override
 	public void onIsPipelineHead() {
 		for (Long traceId : this.traceId2trace.keySet()) {
-			this.putIfFinished(traceId); // FIXME also put invalid traces at the end
+			this.put(traceId, false);
 		}
 
 		super.onIsPipelineHead();
 	}
 
-	private void put(final TraceBuffer traceBuffer) {
-		// final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort =
-		// (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort;
-		// context.put(outputPort, traceBuffer.toTraceEvents());
-		this.send(this.outputPort, traceBuffer.toTraceEvents());
+	private void sendTraceBuffer(final TraceBuffer traceBuffer) {
+		OutputPort<TraceEventRecords> outputPort = (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort
+				: this.traceValidOutputPort;
+		this.send(outputPort, traceBuffer.toTraceEvents());
 	}
 
 	public TimeUnit getTimeunit() {
@@ -131,8 +140,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
 		this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
 	}
 
-	public OutputPort<TraceEventRecords> getOutputPort() {
-		return this.outputPort;
+	public OutputPort<TraceEventRecords> getTraceValidOutputPort() {
+		return this.traceValidOutputPort;
+	}
+
+	public OutputPort<TraceEventRecords> getTraceInvalidOutputPort() {
+		return this.traceInvalidOutputPort;
 	}
 
 	// public Map<Long, TraceBuffer> getTraceId2trace() {
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
index 5c5c568a..b6c76cd8 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
@@ -80,7 +80,7 @@ public class TcpTraceReconstruction extends Analysis {
 
 		SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort());
 
 		// create and configure pipeline
 		Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
index b450cc20..e01c1b52 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
@@ -107,7 +107,7 @@ public class TcpTraceReduction extends Analysis {
 
 		SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
 		SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort());
 
 		SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java
index 26f2baf4..67043576 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java
@@ -69,8 +69,9 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 			analysis.onTerminate();
 		}
 
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
 		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
-		System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
+		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
 
 		assertEquals(50002, analysis.getNumRecords());
 		assertEquals(2, analysis.getNumTraces());
@@ -80,6 +81,8 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 
 		TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
 		assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
+
+		assertThat(quintiles.get(0.5), is(both(greaterThan(34l)).and(lessThan(320l))));
 	}
 
 	@Test
@@ -96,6 +99,10 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 			analysis.onTerminate();
 		}
 
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
+		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
+
 		assertEquals(1489902, analysis.getNumRecords());
 		assertEquals(24013, analysis.getNumTraces());
 
@@ -105,9 +112,6 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
 		assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
 
-		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
-		System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
-
 		assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l))));
 	}
 
@@ -125,6 +129,10 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 			analysis.onTerminate();
 		}
 
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
+		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
+
 		assertEquals(17371, analysis.getNumRecords());
 		assertEquals(22, analysis.getNumTraces());
 
@@ -134,8 +142,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
 		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
 		assertEquals(1, trace1.getTraceMetadata().getTraceId());
 
-		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
-		System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
+		assertThat(quintiles.get(0.5), is(both(greaterThan(200l)).and(lessThan(250l))));
 	}
 
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
index 1bfbc094..5e112cd7 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.File;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -80,7 +78,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
 		TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
 		assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
 
-		this.removeFirstZeroThroughputs(analysis);
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
 		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
 		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
 	}
@@ -108,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
 		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
 		assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
 
-		this.removeFirstZeroThroughputs(analysis);
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
 		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
 		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
 
@@ -138,21 +136,9 @@ public class ChwWorkTraceReconstructionAnalysisTest {
 		TraceEventRecords trace1 = analysis.getElementCollection().get(1);
 		assertEquals(1, trace1.getTraceMetadata().getTraceId());
 
-		this.removeFirstZeroThroughputs(analysis);
+		StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs());
 		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
 		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
 	}
 
-	private void removeFirstZeroThroughputs(final TraceReconstructionAnalysis analysis) {
-		List<Long> throughputs = analysis.getThroughputs();
-		Iterator<Long> iterator = throughputs.iterator();
-		while (iterator.hasNext()) {
-			if (iterator.next() == 0) {
-				iterator.remove();
-			} else {
-				break;
-			}
-		}
-	}
-
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
index 57bf8054..a26578de 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
@@ -89,7 +89,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 		// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
 		// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
 		// SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
 		SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
 
 		SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
index 26ae3b49..5b9bce4c 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
@@ -18,6 +18,7 @@ import teetime.variant.methodcallWithPorts.stage.CollectorSink;
 import teetime.variant.methodcallWithPorts.stage.Counter;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger;
 import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
 import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
@@ -75,6 +76,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 				IFlowRecord.class);
 		this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
 		final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
+		Merger<TraceEventRecords> merger = new Merger<TraceEventRecords>();
 		this.traceCounter = new Counter<TraceEventRecords>();
 		final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
 
@@ -91,7 +93,9 @@ public class TraceReconstructionAnalysis extends Analysis {
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort());
 		SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
 		// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort());
+		SingleElementPipe.connect(merger.getOutputPort(), this.traceCounter.getInputPort());
 		SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort());
 
 		SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
@@ -108,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis {
 		pipeline.addIntermediateStage(instanceOfFilter);
 		pipeline.addIntermediateStage(this.throughputFilter);
 		pipeline.addIntermediateStage(traceReconstructionFilter);
+		pipeline.addIntermediateStage(merger);
 		pipeline.addIntermediateStage(this.traceCounter);
 		pipeline.setLastStage(collector);
 		return pipeline;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index 0817310f..0a718b4f 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -184,7 +184,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
 		SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
 		SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceCounter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
 		// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
 		// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
 		SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index 85fa98ff..3f1d0ae1 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -178,7 +178,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
 		SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
 		SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
 		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
 		SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceCounter.getInputPort());
 		SingleElementPipe.connect(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort());
 		SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
-- 
GitLab