From 370e8c1b8a11570e78d14d6164971a53bcebe74d Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Mon, 7 Jul 2014 14:31:34 +0200
Subject: [PATCH] fixed concurrency bug in TraceReconstructionFilter

---
 .../stage/ElementThroughputMeasuringStage.java     |  6 +++---
 .../TraceReconstructionFilter.java                 | 14 +++-----------
 .../kiekerdays/TcpTraceReconstruction.java         |  2 +-
 .../examples/kiekerdays/TcpTraceReduction.java     |  2 +-
 .../TcpTraceReconstructionAnalysisWithThreads.java |  2 +-
 5 files changed, 9 insertions(+), 17 deletions(-)

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
index 66b9bfd..956e13a 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
@@ -18,13 +18,13 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
 
 	@Override
 	protected void execute5(final T element) {
-		this.numPassedElements++;
-		this.send(element);
-
 		Long timestampInNs = this.triggerInputPort.receive();
 		if (timestampInNs != null) {
 			this.computeElementThroughput(System.nanoTime());
 		}
+		this.numPassedElements++;
+
+		this.send(element);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
index cbd2644..8f0315e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java
@@ -15,7 +15,6 @@
  ***************************************************************************/
 package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction;
 
-import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
@@ -57,7 +56,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
 	private void putIfFinished(final Long traceId) {
 		final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
 		if (traceBuffer != null && traceBuffer.isFinished()) { // null-check to check whether the trace has already been sent and removed
-			boolean removed = null != this.traceId2trace.remove(traceId);
+			boolean removed = (null != this.traceId2trace.remove(traceId));
 			if (removed) {
 				this.put(traceBuffer);
 			}
@@ -83,15 +82,8 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
 
 	@Override
 	public void onIsPipelineHead() {
-		synchronized (this.traceId2trace) {
-			Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator();
-			while (iterator.hasNext()) {
-				TraceBuffer traceBuffer = iterator.next();
-				if (traceBuffer.isFinished()) { // FIXME remove isFinished
-					this.put(traceBuffer); // BETTER put outside of synchronized
-					iterator.remove();
-				}
-			}
+		for (Long traceId : this.traceId2trace.keySet()) {
+			this.putIfFinished(traceId); // FIXME also put invalid traces at the end
 		}
 
 		super.onIsPipelineHead();
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
index 50688b4..085a0ed 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
@@ -26,7 +26,7 @@ import kieker.common.record.flow.IFlowRecord;
 public class TcpTraceReconstruction extends Analysis {
 
 	private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
-	private static final int TCP_RELAY_MAX_SIZE = 100000;
+	private static final int TCP_RELAY_MAX_SIZE = 500000;
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
index fb5eb3c..c20dd7e 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
@@ -32,7 +32,7 @@ import kieker.common.record.flow.IFlowRecord;
 public class TcpTraceReduction extends Analysis {
 
 	private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
-	private static final int TCP_RELAY_MAX_SIZE = 100000;
+	private static final int TCP_RELAY_MAX_SIZE = 500000;
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index b268f11..20a52ac 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -34,7 +34,7 @@ import kieker.common.record.flow.trace.TraceMetadata;
 public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
 
 	private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
-	private static final int TCP_RELAY_MAX_SIZE = 10000000;
+	private static final int TCP_RELAY_MAX_SIZE = 2000000;
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
-- 
GitLab