From 8acbf443e1271a0dff606da3790e5b1187e1b3e7 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 6 Jul 2014 05:06:51 +0200
Subject: [PATCH] added parameter to startMooBench.cmd; fixed concurrency bug
in TraceReductionFilter
---
scripts/MooBench-cmd/startMooBench.cmd | 11 ++++-
src/main/java/teetime/util/ListUtil.java | 7 +++
.../methodcallWithPorts/stage/Counter.java | 2 +-
.../traceReduction/TraceReductionFilter.java | 45 +++++++++++--------
...ReconstructionAnalysisWithThreadsTest.java | 8 +++-
...raceReconstructionAnalysisWithThreads.java | 2 +-
...TraceReductionAnalysisWithThreadsTest.java | 26 ++++++-----
.../TcpTraceReductionAnalysisWithThreads.java | 30 ++++++++++---
8 files changed, 90 insertions(+), 41 deletions(-)
diff --git a/scripts/MooBench-cmd/startMooBench.cmd b/scripts/MooBench-cmd/startMooBench.cmd
index 088146cf..456f122e 100644
--- a/scripts/MooBench-cmd/startMooBench.cmd
+++ b/scripts/MooBench-cmd/startMooBench.cmd
@@ -1,8 +1,15 @@
+@echo off
+
+set runs=%1
+set calls=%2
+
+if [%calls%] == [] (
+ set calls=1000000
+)
set cp=.;MooBench.jar;META-INF/kieker.monitoring.properties;META-INF/kieker.logging.properties
set jvmParams=-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter
-set params=-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q
-set runs=%1
+set params=-d 10 -h 1 -m 0 -t %calls% -o tmp/test.txt -q
for /l %%i in (1, 1, %runs%) do (
java -cp %cp% %jvmParams% mooBench.benchmark.Benchmark %params%
diff --git a/src/main/java/teetime/util/ListUtil.java b/src/main/java/teetime/util/ListUtil.java
index 15df95e3..bea4f12e 100644
--- a/src/main/java/teetime/util/ListUtil.java
+++ b/src/main/java/teetime/util/ListUtil.java
@@ -17,4 +17,11 @@ public class ListUtil {
}
return resultList;
}
+
+ public static <T> List<T> removeFirstHalfElements(final List<T> list) {
+ if (list.size() < 2) {
+ return list;
+ }
+ return list.subList(list.size() / 2 - 1, list.size());
+ }
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java
index 51831541..bd11514e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java
@@ -9,7 +9,7 @@ public class Counter<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numElementsPassed++;
- // this.logger.info("count: " + this.numElementsPassed);
+ // this.logger.debug("count: " + this.numElementsPassed);
this.send(element);
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java
index 931a4e4f..48570a30 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java
@@ -52,25 +52,30 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace
protected void execute5(final TraceEventRecords traceEventRecords) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
- synchronized (this) {
- this.processTimeoutQueue(timestampInNs);
- }
+ this.processTimeoutQueue(timestampInNs);
}
final long timestamp = System.nanoTime();
- synchronized (this) {
- TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
- if (traceBuffer == null) { // NOCS (DCL)
- traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
- this.trace2buffer.put(traceEventRecords, traceBuffer);
+ this.countSameTraces(traceEventRecords, timestamp);
+ }
+
+ private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
+ TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
+ if (traceBuffer == null) {
+ synchronized (this.trace2buffer) {
+ traceBuffer = this.trace2buffer.get(traceEventRecords);
+ if (traceBuffer == null) { // NOCS (DCL)
+ traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
+ this.trace2buffer.put(traceEventRecords, traceBuffer);
+ }
}
- traceBuffer.count();
}
+ traceBuffer.count();
}
@Override
public void onIsPipelineHead() {
- synchronized (this) {
+ synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
@@ -85,16 +90,18 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
- for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
- final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
- // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
- // + " (bufferTimeoutInNs)");
- if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
- final TraceEventRecords record = traceBuffer.getTraceEventRecords();
- record.setCount(traceBuffer.getCount());
- this.send(record);
+ synchronized (this.trace2buffer) {
+ for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
+ final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
+ // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
+ // + " (bufferTimeoutInNs)");
+ if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
+ final TraceEventRecords record = traceBuffer.getTraceEventRecords();
+ record.setCount(traceBuffer.getCount());
+ this.send(record);
+ }
+ iterator.remove();
}
- iterator.remove();
}
}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
index fad06d80..bf0f636f 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre
import static org.junit.Assert.assertEquals;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -26,6 +27,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
+import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
@@ -103,7 +105,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
}
System.out.println("Max size of tcp-relay pipe: " + maxSize);
- // System.out.println("#trace meta data read: " + analysis.getNumTraceMetadatas());
+ // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
@@ -111,7 +113,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
- Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
+
+ List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
+ Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index 0fb446fb..b268f11c 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -156,11 +156,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
+ // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class);
new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
- // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace);
Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
index 19c277b8..61907ac6 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java
@@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads;
import static org.junit.Assert.assertEquals;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -26,6 +27,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
+import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
@@ -37,6 +39,9 @@ import teetime.util.StopWatch;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
+ private static final int EXPECTED_NUM_TRACES = 1000000;
+ private static final int EXPECTED_NUM_SAME_TRACES = 1;
+
private StopWatch stopWatch;
@Before
@@ -79,26 +84,27 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
+ // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
+ // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
+ System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
- Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
+
+ List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
+ Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
- // assertEquals(1000, analysis.getNumTraces());
- assertEquals(1000000, analysis.getNumTraces());
+ assertEquals("#records", 21000001, analysis.getNumRecords());
- // TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
- // assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
- //
- // TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
- // assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
+ for (Integer count : analysis.getNumTraceMetadatas()) {
+ assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution
+ }
- // assertEquals(21001, analysis.getNumRecords());
- assertEquals(21000001, analysis.getNumRecords());
+ assertEquals("#traces", EXPECTED_NUM_SAME_TRACES, analysis.getNumTraces());
}
public static void main(final String[] args) {
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index 070aac3c..c93ffd30 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -21,6 +21,7 @@ import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
+import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
@@ -33,6 +34,7 @@ import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceRedu
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
+import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReductionAnalysisWithThreads extends Analysis {
@@ -133,6 +135,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
+ private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory;
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
@@ -141,6 +144,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
+ this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class));
this.traceCounterFactory = new StageFactory(Counter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
@@ -155,24 +159,27 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
- // Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
+ Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
+ InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
- // Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
+ Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
- SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
+ SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
+ SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
+ SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
- SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
+ SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceCounter.getInputPort());
+ SingleElementPipe.connect(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
@@ -184,9 +191,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
+ pipeline.addIntermediateStage(recordCounter);
+ pipeline.addIntermediateStage(traceMetadataCounter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceReductionFilter);
+ pipeline.addIntermediateStage(traceCounter);
pipeline.addIntermediateStage(traceThroughputFilter);
pipeline.setLastStage(endStage);
return pipeline;
@@ -198,7 +208,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
this.tcpThread.start();
this.clockThread.start();
- this.clock2Thread.start();
+ // this.clock2Thread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
@@ -265,4 +275,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
this.numWorkerThreads = numWorkerThreads;
}
+ public List<Integer> getNumTraceMetadatas() {
+ List<Integer> numTraceMetadatas = new LinkedList<Integer>();
+ for (InstanceCounter<IMonitoringRecord, TraceMetadata> stage : this.traceMetadataCounterFactory.getStages()) {
+ numTraceMetadatas.add(stage.getCounter());
+ }
+ return numTraceMetadatas;
+ }
+
}
--
GitLab