Skip to content
Snippets Groups Projects
Commit c0cd035c authored by Christian Wulf's avatar Christian Wulf
Browse files

identified bug

parent ead2a19c
No related branches found
No related tags found
No related merge requests found
Showing with 80 additions and 40 deletions
......@@ -51,7 +51,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.queue.peek();
}
public int getMaxSize() {
// BETTER find a solution w/o any thread-safe code in this stage
public synchronized int getMaxSize() {
return this.maxSize;
}
......
......@@ -13,7 +13,8 @@ public class CountingFilter<T> extends ConsumerStage<T, T> {
this.send(element);
}
public int getNumElementsPassed() {
// BETTER find a solution w/o any thread-safe code in this stage
public synchronized int getNumElementsPassed() {
return this.numElementsPassed;
}
......
......@@ -48,17 +48,22 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
@Override
protected void execute5(final IFlowRecord element) {
// synchronized (this.traceId2trace) {// TODO remove if everything works
final Long traceId = this.reconstructTrace(element);
if (traceId != null) {
this.putIfFinished(traceId);
}
// }
}
private void putIfFinished(final Long traceId) {
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer.isFinished()) {
this.traceId2trace.remove(traceId);
this.put(traceBuffer);
synchronized (this.traceId2trace) {
if (null != this.traceId2trace.remove(traceId)) {
this.put(traceBuffer);
}
}
}
}
......@@ -66,12 +71,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
Long traceId = null;
if (record instanceof TraceMetadata) {
traceId = ((TraceMetadata) record).getTraceId();
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
traceBuffer.setTrace((TraceMetadata) record);
} else if (record instanceof AbstractTraceEvent) {
traceId = ((AbstractTraceEvent) record).getTraceId();
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
traceBuffer.insertEvent((AbstractTraceEvent) record);
}
......@@ -81,11 +86,15 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
@Override
public void onIsPipelineHead() {
Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator();
while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next();
this.put(traceBuffer);
iterator.remove();
synchronized (this.traceId2trace) {
Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator();
while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next();
if (traceBuffer.isFinished()) { // FIXME remove isFinished
this.put(traceBuffer);
iterator.remove();
}
}
}
super.onIsPipelineHead();
......
......@@ -25,7 +25,7 @@ public class TcpTraceLogging extends Analysis {
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
......
......@@ -74,6 +74,12 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
// until 04.07.2014 (inkl.)
// Median throughput: 74 elements/time unit
// Duration: 17445 ms
// Median throughput: 78 elements/time unit
// Duration: 16608 ms
}
}
......@@ -28,6 +28,9 @@ import org.junit.runners.MethodSorters;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
......@@ -65,6 +68,20 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
this.performAnalysis(4);
}
// until 04.07.2014 (incl.)
// Max size of tcp-relay pipe: 143560
// Median trace throughput: 115 traces/time unit
// Duration: 12907 ms
// Max size of tcp-relay pipe: 51948
// Median trace throughput: 42 traces/time unit
// Duration: 21614 ms
// [2014-07-04 01:06:10 PM] WARNUNG: Reader interrupted (teetime.variant.methodcallWithPorts.stage.io.TCPReader$TCPStringReader run)
// Max size of tcp-relay pipe: 167758
// Median trace throughput: 6 traces/time unit
// Duration: 22373 ms
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
......@@ -78,7 +95,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
analysis.onTerminate();
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
int maxSize = 0;
for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
}
System.out.println("Max size of tcp-relay pipe: " + maxSize);
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
......@@ -88,9 +109,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
......@@ -98,7 +116,10 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
assertEquals("#records", 21000001, analysis.getNumRecords());
// assertEquals(1000, analysis.getNumTraces());
assertEquals("#traces", 1000000, analysis.getNumTraces());
}
public static void main(final String[] args) {
......
......@@ -33,7 +33,7 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000;
private static final int TCP_RELAY_MAX_SIZE = 10000000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
......@@ -42,27 +42,26 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private Thread clock2Thread;
private Thread[] workerThreads;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
private int numWorkerThreads;
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
}
}
......@@ -131,6 +130,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>();
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
......@@ -145,8 +146,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
final StageWithPort<Void, Long> clockStage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
......@@ -160,13 +160,13 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SysOutFilter<IMonitoringRecord> sysout = new SysOutFilter<IMonitoringRecord>(this.tcpRelayPipe);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
// SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe);
// // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
// // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
......@@ -176,9 +176,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
// SingleElementPipe.connect(traceCounter.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
......@@ -186,16 +188,16 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordCounter);
// pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(sysout);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceThroughputFilter);
// pipeline.addIntermediateStage(this.traceCounter);
pipeline.addIntermediateStage(traceCounter);
// pipeline.addIntermediateStage(sysout);
pipeline.setLastStage(endStage);
return pipeline;
}
......@@ -261,8 +263,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return throughputs;
}
public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
return this.tcpRelayPipe;
public List<SpScPipe<IMonitoringRecord>> getTcpRelayPipes() {
return this.tcpRelayPipes;
}
public int getNumWorkerThreads() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment