Skip to content
Snippets Groups Projects
Commit 536c6faf authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed concurrency bug in TraceReconstructionFilter

parent db98abb3
No related branches found
No related tags found
No related merge requests found
......@@ -18,13 +18,13 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numPassedElements++;
this.send(element);
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
}
@Override
......
......@@ -15,7 +15,6 @@
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
......@@ -57,7 +56,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
private void putIfFinished(final Long traceId) {
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer != null && traceBuffer.isFinished()) { // null-check to check whether the trace has already been sent and removed
boolean removed = null != this.traceId2trace.remove(traceId);
boolean removed = (null != this.traceId2trace.remove(traceId));
if (removed) {
this.put(traceBuffer);
}
......@@ -83,15 +82,8 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
@Override
public void onIsPipelineHead() {
synchronized (this.traceId2trace) {
Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator();
while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next();
if (traceBuffer.isFinished()) { // FIXME remove isFinished
this.put(traceBuffer); // BETTER put outside of synchronized
iterator.remove();
}
}
for (Long traceId : this.traceId2trace.keySet()) {
this.putIfFinished(traceId); // FIXME also put invalid traces at the end
}
super.onIsPipelineHead();
......
......@@ -26,7 +26,7 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstruction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 100000;
private static final int TCP_RELAY_MAX_SIZE = 500000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
......
......@@ -32,7 +32,7 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReduction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 100000;
private static final int TCP_RELAY_MAX_SIZE = 500000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
......
......@@ -34,7 +34,7 @@ import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 10000000;
private static final int TCP_RELAY_MAX_SIZE = 2000000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment