From 8ba52389493637f42a6eaa7c9b55a59065bf5ad6 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Mon, 30 Jun 2014 08:16:34 +0200
Subject: [PATCH] added performance test
 TcpTraceReconstructionAnalysisWithThreads

---
 .../java/teetime/util/HashMapWithDefault.java |  54 ++++++
 .../framework/core/pipe/SpScPipe.java         |  15 +-
 .../stage/Distributor.java                    |   4 +-
 .../methodcallWithPorts/stage/Relay.java      |   1 +
 .../TraceReconstructionFilter.java            |  51 +----
 ...orkTcpTraceReconstructionAnalysisTest.java |  11 +-
 .../TcpTraceReconstructionAnalysis.java       |   7 +-
 ...ReconstructionAnalysisWithThreadsTest.java |  81 ++++++++
 ...raceReconstructionAnalysisWithThreads.java | 179 ++++++++++++++++++
 9 files changed, 340 insertions(+), 63 deletions(-)
 create mode 100644 src/main/java/teetime/util/HashMapWithDefault.java
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
 create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java

diff --git a/src/main/java/teetime/util/HashMapWithDefault.java b/src/main/java/teetime/util/HashMapWithDefault.java
new file mode 100644
index 00000000..4292c243
--- /dev/null
+++ b/src/main/java/teetime/util/HashMapWithDefault.java
@@ -0,0 +1,54 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.util;
+
+import java.util.HashMap;
+
+import teetime.util.concurrent.hashmap.ValueFactory;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class HashMapWithDefault<K, V> extends HashMap<K, V> {
+
+	private static final long serialVersionUID = -7958038532219740472L;
+
+	private final ValueFactory<V> valueFactory;
+
+	/**
+	 * @since 1.10
+	 */
+	public HashMapWithDefault(final ValueFactory<V> valueFactory) {
+		this.valueFactory = valueFactory;
+	}
+
+	/**
+	 * @return the corresponding value if the key exists. Otherwise, it creates,
+	 *         inserts, and returns a new default value.
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public V get(final Object key) {
+		V value = super.get(key);
+		if (value == null) {
+			value = this.valueFactory.create();
+			super.put((K) key, value);
+		}
+		return value;
+	}
+}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 67c6dca5..9570f8fd 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -7,23 +7,26 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
 public class SpScPipe<T> extends AbstractPipe<T> {
 
 	private final FFBufferOrdered3<T> queue;
+	private int maxSize;
 
-	private SpScPipe(final int initialCapacity) {
-		this.queue = new FFBufferOrdered3<T>(initialCapacity);
+	private SpScPipe(final int capacity) {
+		this.queue = new FFBufferOrdered3<T>(capacity);
 	}
 
-	public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int initialCapacity) {
-		IPipe<T> pipe = new SpScPipe<T>(initialCapacity);
+	public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		SpScPipe<T> pipe = new SpScPipe<T>(capacity);
 		targetPort.setPipe(pipe);
 		if (sourcePort != null) {
 			sourcePort.setPipe(pipe);
 			sourcePort.setCachedTargetStage(targetPort.getOwningStage());
 		}
+		return pipe;
 	}
 
 	@Override
 	public void add(final T element) {
 		this.queue.offer(element);
+		this.maxSize = Math.max(this.queue.size(), this.maxSize);
 	}
 
 	@Override
@@ -46,4 +49,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 		return this.queue.peek();
 	}
 
+	public int getMaxSize() {
+		return this.maxSize;
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
index 5d8be6cd..e60b19e1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
@@ -38,7 +38,9 @@ public final class Distributor<T> extends AbstractStage<T, T> {
 	public void onIsPipelineHead() {
 		for (OutputPort<?> op : this.outputPorts) {
 			op.getPipe().close();
-			System.out.println("End signal sent, size: " + op.getPipe().size());
+			if (this.logger.isDebugEnabled()) {
+				this.logger.debug("End signal sent, size: " + op.getPipe().size());
+			}
 		}
 
 		// for (OutputPort<?> op : this.outputPorts) {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
index c4e7a006..0486733e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java
@@ -16,6 +16,7 @@ public class Relay<T> extends AbstractStage<T, T> {
 			if (this.getInputPort().getPipe().isClosed()) {
 				this.setReschedulable(false);
 				System.out.println("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
+				assert 0 == this.getInputPort().getPipe().size();
 			}
 			return;
 		}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
index 5e8e35c0..aa307ce4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
@@ -17,10 +17,9 @@ package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction;
 
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
-import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
+import teetime.util.HashMapWithDefault;
 import teetime.util.concurrent.hashmap.TraceBuffer;
 import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
 
@@ -39,46 +38,22 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
 	private TimeUnit timeunit;
 	private long maxTraceDuration = Long.MAX_VALUE;
 	private long maxTraceTimeout = Long.MAX_VALUE;
-	private boolean timeout;
 	private long maxEncounteredLoggingTimestamp = -1;
 
-	private static final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
+	private static final Map<Long, TraceBuffer> traceId2trace = new HashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
 
 	@Override
 	protected void execute5(final IFlowRecord element) {
 		final Long traceId = this.reconstructTrace(element);
 		if (traceId != null) {
 			this.putIfFinished(traceId);
-			this.processTimestamp(element);
 		}
 	}
 
-	private void processTimestamp(final IFlowRecord record) {
-		if (this.timeout) {
-			synchronized (this) {
-				final long loggingTimestamp = this.getTimestamp(record);
-				// can we assume a rough order of logging timestamps? (yes, except with DB reader)
-				if (loggingTimestamp > this.maxEncounteredLoggingTimestamp) {
-					this.maxEncounteredLoggingTimestamp = loggingTimestamp;
-				}
-				this.processTimeoutQueue(this.maxEncounteredLoggingTimestamp);
-			}
-		}
-	}
-
-	private long getTimestamp(final IFlowRecord record) {
-		if (record instanceof AbstractTraceEvent) {
-			return ((AbstractTraceEvent) record).getTimestamp();
-		}
-		return -1;
-	}
-
 	private void putIfFinished(final Long traceId) {
 		final TraceBuffer traceBuffer = TraceReconstructionFilter.traceId2trace.get(traceId);
 		if (traceBuffer.isFinished()) {
-			synchronized (this) { // has to be synchronized because of timeout cleanup
-				TraceReconstructionFilter.traceId2trace.remove(traceId);
-			}
+			TraceReconstructionFilter.traceId2trace.remove(traceId);
 			this.put(traceBuffer);
 		}
 	}
@@ -100,12 +75,6 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
 		return traceId;
 	}
 
-	@Override
-	public void onStart() {
-		this.timeout = !((this.maxTraceTimeout == Long.MAX_VALUE) && (this.maxTraceDuration == Long.MAX_VALUE));
-		super.onStart();
-	}
-
 	@Override
 	public void onIsPipelineHead() {
 		Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator();
@@ -118,20 +87,6 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
 		super.onIsPipelineHead();
 	}
 
-	private void processTimeoutQueue(final long timestamp) {
-		final long duration = timestamp - this.maxTraceDuration;
-		final long traceTimeout = timestamp - this.maxTraceTimeout;
-
-		for (final Iterator<Entry<Long, TraceBuffer>> iterator = TraceReconstructionFilter.traceId2trace.entrySet().iterator(); iterator.hasNext();) {
-			final TraceBuffer traceBuffer = iterator.next().getValue();
-			if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long time no see
-					|| (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max duration is gone
-				this.put(traceBuffer);
-				iterator.remove();
-			}
-		}
-	}
-
 	private void put(final TraceBuffer traceBuffer) {
 		// final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort =
 		// (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java
index e1693a94..316af869 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java
@@ -60,8 +60,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
 			analysis.onTerminate();
 		}
 
-		assertEquals(21001, analysis.getNumRecords());
-		assertEquals(1000, analysis.getNumTraces());
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
+		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
+
+		// assertEquals(1000, analysis.getNumTraces());
+		assertEquals(1000000, analysis.getNumTraces());
 
 		// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
 		// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
@@ -69,8 +72,8 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
 		// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
 		// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
 
-		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordThroughputs());
-		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
+		// assertEquals(21001, analysis.getNumRecords());
+		assertEquals(21000001, analysis.getNumRecords());
 	}
 
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
index c86e10c0..329cbcc0 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
@@ -15,7 +15,6 @@ import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
-import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 
 import kieker.analysis.plugin.filter.flow.TraceEventRecords;
@@ -30,8 +29,6 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 	private Thread clock2Thread;
 	private Thread workerThread;
 
-	private ClassNameRegistryRepository classNameRegistryRepository;
-
 	private CountingFilter<IMonitoringRecord> recordCounter;
 
 	private CountingFilter<TraceEventRecords> traceCounter;
@@ -67,8 +64,6 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 	}
 
 	private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
-		this.classNameRegistryRepository = new ClassNameRegistryRepository();
-
 		// create stages
 		TCPReader tcpReader = new TCPReader();
 		this.recordCounter = new CountingFilter<IMonitoringRecord>();
@@ -139,7 +134,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
 		return this.recordThroughputFilter.getThroughputs();
 	}
 
-	public List<Long> getTraceThroughputFilter() {
+	public List<Long> getTraceThroughputs() {
 		return this.traceThroughputFilter.getThroughputs();
 	}
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
new file mode 100644
index 00000000..2c1e1f86
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -0,0 +1,81 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import teetime.util.StatisticsUtil;
+import teetime.util.StopWatch;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
+
+	private StopWatch stopWatch;
+
+	@Before
+	public void before() {
+		this.stopWatch = new StopWatch();
+	}
+
+	@After
+	public void after() {
+		long overallDurationInNs = this.stopWatch.getDurationInNs();
+		System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
+	}
+
+	@Test
+	public void performAnalysis() {
+		final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
+		analysis.init();
+
+		this.stopWatch.start();
+		try {
+			analysis.start();
+		} finally {
+			this.stopWatch.end();
+			analysis.onTerminate();
+		}
+
+		System.out.println("Max size of pipe: " + analysis.getTcpRelayPipe().getMaxSize());
+
+		Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
+		System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
+
+		// assertEquals(1000, analysis.getNumTraces());
+		assertEquals(1000000, analysis.getNumTraces());
+
+		// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
+		// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
+		//
+		// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
+		// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
+
+		// assertEquals(21001, analysis.getNumRecords());
+		assertEquals(21000001, analysis.getNumRecords());
+	}
+
+}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
new file mode 100644
index 00000000..f7fef31a
--- /dev/null
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -0,0 +1,179 @@
+package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import teetime.variant.explicitScheduling.framework.core.Analysis;
+import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
+import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
+import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.stage.Clock;
+import teetime.variant.methodcallWithPorts.stage.CountingFilter;
+import teetime.variant.methodcallWithPorts.stage.Distributor;
+import teetime.variant.methodcallWithPorts.stage.EndStage;
+import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
+import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
+import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
+import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
+
+import kieker.analysis.plugin.filter.flow.TraceEventRecords;
+import kieker.common.record.IMonitoringRecord;
+import kieker.common.record.flow.IFlowRecord;
+
+public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
+
+	private static final int TCP_RELAY_MAX_SIZE = 500000;
+
+	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
+
+	private Thread tcpThread;
+	private Thread clockThread;
+	private Thread clock2Thread;
+	private Thread workerThread;
+
+	private CountingFilter<IMonitoringRecord> recordCounter;
+
+	private CountingFilter<TraceEventRecords> traceCounter;
+
+	private ThroughputFilter<IFlowRecord> recordThroughputFilter;
+	private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
+
+	private SpScPipe<IMonitoringRecord> tcpRelayPipe;
+
+	@Override
+	public void init() {
+		super.init();
+		StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
+		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
+
+		StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
+		this.clockThread = new Thread(new RunnableStage(clockStage));
+
+		StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
+		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
+
+		StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
+		this.workerThread = new Thread(new RunnableStage(pipeline));
+
+	}
+
+	private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
+		TCPReader tcpReader = new TCPReader();
+		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
+
+		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
+
+		// create and configure pipeline
+		Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
+		pipeline.setFirstStage(tcpReader);
+		pipeline.setLastStage(distributor);
+		return pipeline;
+	}
+
+	private StageWithPort<Void, Long> buildClockPipeline() {
+		Clock clock = new Clock();
+		clock.setIntervalDelayInMs(1000);
+
+		return clock;
+	}
+
+	private StageWithPort<Void, Long> buildClock2Pipeline() {
+		Clock clock = new Clock();
+		clock.setIntervalDelayInMs(2000);
+
+		return clock;
+	}
+
+	private Pipeline<IMonitoringRecord, TraceEventRecords> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
+			final StageWithPort<Void, Long> clockStage,
+			final StageWithPort<Void, Long> clock2Stage) {
+		// create stages
+		Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
+		this.recordCounter = new CountingFilter<IMonitoringRecord>();
+		final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
+				IFlowRecord.class);
+		this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
+		final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
+		this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
+		this.traceCounter = new CountingFilter<TraceEventRecords>();
+		EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
+
+		// connect stages
+		this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
+		SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
+		SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
+		// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
+		// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
+		SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
+		SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
+
+		SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
+		SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1);
+
+		// create and configure pipeline
+		Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
+		pipeline.setFirstStage(relay);
+		pipeline.addIntermediateStage(this.recordCounter);
+		pipeline.addIntermediateStage(instanceOfFilter);
+		// pipeline.addIntermediateStage(this.recordThroughputFilter);
+		pipeline.addIntermediateStage(traceReconstructionFilter);
+		pipeline.addIntermediateStage(this.traceThroughputFilter);
+		pipeline.addIntermediateStage(this.traceCounter);
+		pipeline.setLastStage(endStage);
+		return pipeline;
+	}
+
+	@Override
+	public void start() {
+		super.start();
+
+		this.tcpThread.start();
+		// this.clockThread.start();
+		this.clock2Thread.start();
+		this.workerThread.start();
+
+		try {
+			this.tcpThread.join();
+		} catch (InterruptedException e) {
+			throw new IllegalStateException(e);
+		}
+
+		try {
+			this.workerThread.join();
+		} catch (InterruptedException e) {
+			throw new IllegalStateException(e);
+		}
+		this.clockThread.interrupt();
+		this.clock2Thread.interrupt();
+	}
+
+	public List<TraceEventRecords> getElementCollection() {
+		return this.elementCollection;
+	}
+
+	public int getNumRecords() {
+		return this.recordCounter.getNumElementsPassed();
+	}
+
+	public int getNumTraces() {
+		return this.traceCounter.getNumElementsPassed();
+	}
+
+	public List<Long> getRecordThroughputs() {
+		return this.recordThroughputFilter.getThroughputs();
+	}
+
+	public List<Long> getTraceThroughputs() {
+		return this.traceThroughputFilter.getThroughputs();
+	}
+
+	public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
+		return this.tcpRelayPipe;
+	}
+
+}
-- 
GitLab