diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index b5d337de8fa127d6666c73604dbe3cd03d2c89fc..fd2014a92c1dbb06c0055a091f105b63422e9b67 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -51,7 +51,8 @@ public class SpScPipe<T> extends AbstractPipe<T> { return this.queue.peek(); } - public int getMaxSize() { + // BETTER find a solution w/o any thread-safe code in this stage + public synchronized int getMaxSize() { return this.maxSize; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java index 39a1647715bd878663574b19c7e63c65f1e7eca3..b1fb1b2677b2b87c2868bf51f8dbf172f132db33 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CountingFilter.java @@ -13,7 +13,8 @@ public class CountingFilter<T> extends ConsumerStage<T, T> { this.send(element); } - public int getNumElementsPassed() { + // BETTER find a solution w/o any thread-safe code in this stage + public synchronized int getNumElementsPassed() { return this.numElementsPassed; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index a998feb81072e36e2763945aabc1c6ada27b7bf5..dcc893c4fe0caca1e0b930f02a449ce970b081c0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -48,17 +48,22 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE @Override protected void execute5(final IFlowRecord element) { + // synchronized (this.traceId2trace) {// TODO remove if everything works final Long traceId = this.reconstructTrace(element); if (traceId != null) { this.putIfFinished(traceId); } + // } } private void putIfFinished(final Long traceId) { final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); if (traceBuffer.isFinished()) { - this.traceId2trace.remove(traceId); - this.put(traceBuffer); + synchronized (this.traceId2trace) { + if (null != this.traceId2trace.remove(traceId)) { + this.put(traceBuffer); + } + } } } @@ -66,12 +71,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE Long traceId = null; if (record instanceof TraceMetadata) { traceId = ((TraceMetadata) record).getTraceId(); - final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); + TraceBuffer traceBuffer = this.traceId2trace.get(traceId); traceBuffer.setTrace((TraceMetadata) record); } else if (record instanceof AbstractTraceEvent) { traceId = ((AbstractTraceEvent) record).getTraceId(); - final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); + TraceBuffer traceBuffer = this.traceId2trace.get(traceId); traceBuffer.insertEvent((AbstractTraceEvent) record); } @@ -81,11 +86,15 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE @Override public void onIsPipelineHead() { - Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator(); - while (iterator.hasNext()) { - TraceBuffer traceBuffer = iterator.next(); - this.put(traceBuffer); - iterator.remove(); + synchronized (this.traceId2trace) { + Iterator<TraceBuffer> iterator = this.traceId2trace.values().iterator(); + while (iterator.hasNext()) { + TraceBuffer traceBuffer = iterator.next(); + if (traceBuffer.isFinished()) { // FIXME remove isFinished + this.put(traceBuffer); + iterator.remove(); + } + } } super.onIsPipelineHead(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java index eb732012260a10690fde4faad8e4aaba82849ccf..cfa1d8a2d4d7aa9de21c775f8a0368f7906b44e4 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java @@ -25,7 +25,7 @@ public class TcpTraceLogging extends Analysis { public void init() { super.init(); StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); } private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 040cacb99fde3a3efbe9c4d3ce646ee0a1621b30..3c1022370c204f80059dc89b7d3324f719d9337b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -74,6 +74,12 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { // assertEquals(21001, analysis.getNumRecords()); assertEquals(21000001, analysis.getNumRecords()); + + // until 04.07.2014 (inkl.) + // Median throughput: 74 elements/time unit + // Duration: 17445 ms + // Median throughput: 78 elements/time unit + // Duration: 16608 ms } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 668e263cd41654fd1501f8ceb01e59bb7618d2f3..a8130d71d5d6c38bb781deed0cfca6d362a58717 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -28,6 +28,9 @@ import org.junit.runners.MethodSorters; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; + +import kieker.common.record.IMonitoringRecord; /** * @author Christian Wulf @@ -65,6 +68,20 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { this.performAnalysis(4); } + // until 04.07.2014 (incl.) + // Max size of tcp-relay pipe: 143560 + // Median trace throughput: 115 traces/time unit + // Duration: 12907 ms + + // Max size of tcp-relay pipe: 51948 + // Median trace throughput: 42 traces/time unit + // Duration: 21614 ms + + // [2014-07-04 01:06:10 PM] WARNUNG: Reader interrupted (teetime.variant.methodcallWithPorts.stage.io.TCPReader$TCPStringReader run) + // Max size of tcp-relay pipe: 167758 + // Median trace throughput: 6 traces/time unit + // Duration: 22373 ms + void performAnalysis(final int numWorkerThreads) { final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); analysis.setNumWorkerThreads(numWorkerThreads); @@ -78,7 +95,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { analysis.onTerminate(); } - System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + int maxSize = 0; + for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) { + maxSize = Math.max(maxSize, pipe.getMaxSize()); + } + System.out.println("Max size of tcp-relay pipe: " + maxSize); // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); @@ -88,9 +109,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); - // assertEquals(1000, analysis.getNumTraces()); - assertEquals(1000000, analysis.getNumTraces()); - // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); // @@ -98,7 +116,10 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); // assertEquals(21001, analysis.getNumRecords()); - assertEquals(21000001, analysis.getNumRecords()); + assertEquals("#records", 21000001, analysis.getNumRecords()); + + // assertEquals(1000, analysis.getNumTraces()); + assertEquals("#traces", 1000000, analysis.getNumTraces()); } public static void main(final String[] args) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 2c13357ceef5a9a7011d448994767da044c905d6..987ed2a12607e42d95ad660479e128393f4b08ad 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -33,7 +33,7 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); - private static final int TCP_RELAY_MAX_SIZE = 500000; + private static final int TCP_RELAY_MAX_SIZE = 10000000; private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); @@ -42,27 +42,26 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private Thread clock2Thread; private Thread[] workerThreads; - private SpScPipe<IMonitoringRecord> tcpRelayPipe; private int numWorkerThreads; @Override public void init() { super.init(); StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline)); StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); + this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); } } @@ -131,6 +130,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); + public TcpTraceReconstructionAnalysisWithThreads() { try { this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); @@ -145,8 +146,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, - final StageWithPort<Void, Long> clockStage, - final StageWithPort<Void, Long> clock2Stage) { + final StageWithPort<Void, Long> clockStage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -160,13 +160,13 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages - this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); - - SysOutFilter<IMonitoringRecord> sysout = new SysOutFilter<IMonitoringRecord>(this.tcpRelayPipe); + SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + this.tcpRelayPipes.add(tcpRelayPipe); + // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); - // // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); - // // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); + SingleElementPipe.connect(recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); @@ -176,9 +176,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); - // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); + // SingleElementPipe.connect(traceCounter.getOutputPort(), sysout.getInputPort()); + // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); // SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); @@ -186,16 +188,16 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); pipeline.setFirstStage(relay); + pipeline.addIntermediateStage(recordCounter); // pipeline.addIntermediateStage(recordThroughputFilter); - // pipeline.addIntermediateStage(sysout); pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); pipeline.addIntermediateStage(traceThroughputFilter); - // pipeline.addIntermediateStage(this.traceCounter); + pipeline.addIntermediateStage(traceCounter); + // pipeline.addIntermediateStage(sysout); pipeline.setLastStage(endStage); return pipeline; } @@ -261,8 +263,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return throughputs; } - public SpScPipe<IMonitoringRecord> getTcpRelayPipe() { - return this.tcpRelayPipe; + public List<SpScPipe<IMonitoringRecord>> getTcpRelayPipes() { + return this.tcpRelayPipes; } public int getNumWorkerThreads() {