Skip to content
Snippets Groups Projects
Commit 8acbf443 authored by Christian Wulf's avatar Christian Wulf
Browse files

added parameter to startMooBench.cmd;

fixed concurrency bug in TraceReductionFilter
parent 2f80c48e
No related branches found
No related tags found
No related merge requests found
Showing
with 90 additions and 41 deletions
@echo off
set runs=%1
set calls=%2
if [%calls%] == [] (
set calls=1000000
)
set cp=.;MooBench.jar;META-INF/kieker.monitoring.properties;META-INF/kieker.logging.properties
set jvmParams=-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter
set params=-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q
set runs=%1
set params=-d 10 -h 1 -m 0 -t %calls% -o tmp/test.txt -q
for /l %%i in (1, 1, %runs%) do (
java -cp %cp% %jvmParams% mooBench.benchmark.Benchmark %params%
......
......@@ -17,4 +17,11 @@ public class ListUtil {
}
return resultList;
}
public static <T> List<T> removeFirstHalfElements(final List<T> list) {
if (list.size() < 2) {
return list;
}
return list.subList(list.size() / 2 - 1, list.size());
}
}
......@@ -9,7 +9,7 @@ public class Counter<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numElementsPassed++;
// this.logger.info("count: " + this.numElementsPassed);
// this.logger.debug("count: " + this.numElementsPassed);
this.send(element);
}
......
......@@ -52,25 +52,30 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace
protected void execute5(final TraceEventRecords traceEventRecords) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
synchronized (this) {
this.processTimeoutQueue(timestampInNs);
}
this.processTimeoutQueue(timestampInNs);
}
final long timestamp = System.nanoTime();
synchronized (this) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
this.countSameTraces(traceEventRecords, timestamp);
}
private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) {
synchronized (this.trace2buffer) {
traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
}
}
traceBuffer.count();
}
traceBuffer.count();
}
@Override
public void onIsPipelineHead() {
synchronized (this) {
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
......@@ -85,16 +90,18 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
this.send(record);
synchronized (this.trace2buffer) {
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
this.send(record);
}
iterator.remove();
}
iterator.remove();
}
}
......
......@@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -26,6 +27,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
......@@ -103,7 +105,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
}
System.out.println("Max size of tcp-relay pipe: " + maxSize);
// System.out.println("#trace meta data read: " + analysis.getNumTraceMetadatas());
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
......@@ -111,7 +113,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
......
......@@ -156,11 +156,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class);
new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace);
Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
......
......@@ -17,6 +17,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -26,6 +27,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
......@@ -37,6 +39,9 @@ import teetime.util.StopWatch;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
private static final int EXPECTED_NUM_TRACES = 1000000;
private static final int EXPECTED_NUM_SAME_TRACES = 1;
private StopWatch stopWatch;
@Before
......@@ -79,26 +84,27 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
assertEquals("#records", 21000001, analysis.getNumRecords());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
for (Integer count : analysis.getNumTraceMetadatas()) {
assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution
}
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_SAME_TRACES, analysis.getNumTraces());
}
public static void main(final String[] args) {
......
......@@ -21,6 +21,7 @@ import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
......@@ -33,6 +34,7 @@ import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceRedu
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReductionAnalysisWithThreads extends Analysis {
......@@ -133,6 +135,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory;
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
......@@ -141,6 +144,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class));
this.traceCounterFactory = new StageFactory(Counter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
......@@ -155,24 +159,27 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
// Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
// Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceCounter.getInputPort());
SingleElementPipe.connect(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
......@@ -184,9 +191,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordCounter);
pipeline.addIntermediateStage(traceMetadataCounter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceReductionFilter);
pipeline.addIntermediateStage(traceCounter);
pipeline.addIntermediateStage(traceThroughputFilter);
pipeline.setLastStage(endStage);
return pipeline;
......@@ -198,7 +208,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
this.tcpThread.start();
this.clockThread.start();
this.clock2Thread.start();
// this.clock2Thread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
......@@ -265,4 +275,12 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
this.numWorkerThreads = numWorkerThreads;
}
public List<Integer> getNumTraceMetadatas() {
List<Integer> numTraceMetadatas = new LinkedList<Integer>();
for (InstanceCounter<IMonitoringRecord, TraceMetadata> stage : this.traceMetadataCounterFactory.getStages()) {
numTraceMetadatas.add(stage.getCounter());
}
return numTraceMetadatas;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment