diff --git a/scripts/MooBench-cmd/startMooBench.cmd b/scripts/MooBench-cmd/startMooBench.cmd index 088146cf38e80707435462a9892cef977112983c..456f122ec63b94ec2551cf5bea7d6dee68fae997 100644 --- a/scripts/MooBench-cmd/startMooBench.cmd +++ b/scripts/MooBench-cmd/startMooBench.cmd @@ -1,8 +1,15 @@ +@echo off + +set runs=%1 +set calls=%2 + +if [%calls%] == [] ( + set calls=1000000 +) set cp=.;MooBench.jar;META-INF/kieker.monitoring.properties;META-INF/kieker.logging.properties set jvmParams=-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter -set params=-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q -set runs=%1 +set params=-d 10 -h 1 -m 0 -t %calls% -o tmp/test.txt -q for /l %%i in (1, 1, %runs%) do ( java -cp %cp% %jvmParams% mooBench.benchmark.Benchmark %params% diff --git a/src/main/java/teetime/util/ListUtil.java b/src/main/java/teetime/util/ListUtil.java index 15df95e36b9584a5d10edd80b7b5f031fa814105..bea4f12ebfcdb50f5b97cef016f455c37d3ed413 100644 --- a/src/main/java/teetime/util/ListUtil.java +++ b/src/main/java/teetime/util/ListUtil.java @@ -17,4 +17,11 @@ public class ListUtil { } return resultList; } + + public static <T> List<T> removeFirstHalfElements(final List<T> list) { + if (list.size() < 2) { + return list; + } + return list.subList(list.size() / 2 - 1, list.size()); + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java index 51831541aeff5b13ac2383e9b37c0d20319def97..bd11514e87d2dfadfee9c8b9f66f0596496902ae 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java @@ -9,7 +9,7 @@ public class Counter<T> extends ConsumerStage<T, T> { @Override protected void execute5(final T element) { this.numElementsPassed++; - // this.logger.info("count: " + this.numElementsPassed); + // this.logger.debug("count: " + this.numElementsPassed); this.send(element); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java index 931a4e4f735657d88780932f0c7c742dd9421631..48570a3010c340c75cc0f50c6dc062497e0bbe21 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java @@ -52,25 +52,30 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace protected void execute5(final TraceEventRecords traceEventRecords) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { - synchronized (this) { - this.processTimeoutQueue(timestampInNs); - } + this.processTimeoutQueue(timestampInNs); } final long timestamp = System.nanoTime(); - synchronized (this) { - TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords); - if (traceBuffer == null) { // NOCS (DCL) - traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords); - this.trace2buffer.put(traceEventRecords, traceBuffer); + this.countSameTraces(traceEventRecords, timestamp); + } + + private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) { + TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords); + if (traceBuffer == null) { + synchronized (this.trace2buffer) { + traceBuffer = this.trace2buffer.get(traceEventRecords); + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords); + this.trace2buffer.put(traceEventRecords, traceBuffer); + } } - traceBuffer.count(); } + traceBuffer.count(); } @Override public void onIsPipelineHead() { - synchronized (this) { + synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue(); final TraceEventRecords record = buffer.getTraceEventRecords(); @@ -85,16 +90,18 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace private void processTimeoutQueue(final long timestampInNs) { final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs; - for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) { - final TraceAggregationBuffer traceBuffer = iterator.next().getValue(); - // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs - // + " (bufferTimeoutInNs)"); - if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { - final TraceEventRecords record = traceBuffer.getTraceEventRecords(); - record.setCount(traceBuffer.getCount()); - this.send(record); + synchronized (this.trace2buffer) { + for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) { + final TraceAggregationBuffer traceBuffer = iterator.next().getValue(); + // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs + // + " (bufferTimeoutInNs)"); + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { + final TraceEventRecords record = traceBuffer.getTraceEventRecords(); + record.setCount(traceBuffer.getCount()); + this.send(record); + } + iterator.remove(); } - iterator.remove(); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index fad06d80c5b9fe52768d88fda5a09b5b45b29049..bf0f636f81ca72aac98f332ab6cdf3d38cb78365 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre import static org.junit.Assert.assertEquals; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -26,6 +27,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -103,7 +105,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { } System.out.println("Max size of tcp-relay pipe: " + maxSize); - // System.out.println("#trace meta data read: " + analysis.getNumTraceMetadatas()); + // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); @@ -111,7 +113,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); - Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); + + List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs()); + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 0fb446fb74f08635ab75046507665e5fa24b24d3..b268f11ca0344fab6698dcfb6dd739c75b73d38b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -156,11 +156,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class); new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace); Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java index 19c277b88055e601d097ed88af4c3623a54ebe1f..61907ac6ec191a7521e1359b5bc848e8ed0ecc93 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java @@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads; import static org.junit.Assert.assertEquals; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -26,6 +27,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; @@ -37,6 +39,9 @@ import teetime.util.StopWatch; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest { + private static final int EXPECTED_NUM_TRACES = 1000000; + private static final int EXPECTED_NUM_SAME_TRACES = 1; + private StopWatch stopWatch; @Before @@ -79,26 +84,27 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest { } System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); + // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); + System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs()); // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); - Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); + + List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs()); + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); - // assertEquals(1000, analysis.getNumTraces()); - assertEquals(1000000, analysis.getNumTraces()); + assertEquals("#records", 21000001, analysis.getNumRecords()); - // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); - // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); - // - // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); - // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + for (Integer count : analysis.getNumTraceMetadatas()) { + assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution + } - // assertEquals(21001, analysis.getNumRecords()); - assertEquals(21000001, analysis.getNumRecords()); + assertEquals("#traces", EXPECTED_NUM_SAME_TRACES, analysis.getNumTraces()); } public static void main(final String[] args) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index 070aac3c5d46124469ac7561351bba3142c0f8ae..c93ffd300dc01ccdd3ac464b1921ebd0bdd04650 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -21,6 +21,7 @@ import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; +import teetime.variant.methodcallWithPorts.stage.InstanceCounter; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; @@ -33,6 +34,7 @@ import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceRedu import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.TraceMetadata; public class TcpTraceReductionAnalysisWithThreads extends Analysis { @@ -133,6 +135,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; @@ -141,6 +144,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { try { this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); } catch (NoSuchMethodException e) { @@ -155,24 +159,27 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { final StageWithPort<Void, Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); - // Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer); - // Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // connect stages this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); - SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); + SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); + SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort()); - SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceCounter.getInputPort()); + SingleElementPipe.connect(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort()); SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); @@ -184,9 +191,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { // create and configure pipeline Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); pipeline.setFirstStage(relay); + pipeline.addIntermediateStage(recordCounter); + pipeline.addIntermediateStage(traceMetadataCounter); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); pipeline.addIntermediateStage(traceReductionFilter); + pipeline.addIntermediateStage(traceCounter); pipeline.addIntermediateStage(traceThroughputFilter); pipeline.setLastStage(endStage); return pipeline; @@ -198,7 +208,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { this.tcpThread.start(); this.clockThread.start(); - this.clock2Thread.start(); + // this.clock2Thread.start(); for (Thread workerThread : this.workerThreads) { workerThread.start(); @@ -265,4 +275,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { this.numWorkerThreads = numWorkerThreads; } + public List<Integer> getNumTraceMetadatas() { + List<Integer> numTraceMetadatas = new LinkedList<Integer>(); + for (InstanceCounter<IMonitoringRecord, TraceMetadata> stage : this.traceMetadataCounterFactory.getStages()) { + numTraceMetadatas.add(stage.getCounter()); + } + return numTraceMetadatas; + } + }