Skip to content
Snippets Groups Projects
Commit 3ae28329 authored by Christian Wulf's avatar Christian Wulf
Browse files

added TraceReductionFilter and test

parent 2d5d8dfc
No related branches found
No related tags found
No related merge requests found
Showing
with 680 additions and 46 deletions
......@@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
teetime.variant.methodcallWithPorts.stage.level = WARNING
teetime.variant.methodcallWithPorts.stage.level = FINE
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -4,8 +4,8 @@ java=~/jdk1.7.0_60/bin/java
cp=.:MooBench.jar:META-INF/kieker.monitoring.properties:META-INF/kieker.logging.properties
jvmParams="-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter"
params="-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q"
runs=$1
#runs=$1
for i in {1..${runs}}; do
for i in {1..3}; do
${java} -cp ${cp} ${jvmParams} mooBench.benchmark.Benchmark ${params};
done
......@@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
......@@ -17,12 +18,13 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numPassedElements++;
this.send(element);
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
}
@Override
......@@ -33,10 +35,11 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (diffInNs > 0) {
long throughputInNsPerElement = this.numPassedElements / diffInNs;
this.throughputs.add(throughputInNsPerElement);
this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit");
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
}
......
......@@ -15,7 +15,6 @@ public class Relay<T> extends AbstractStage<T, T> {
if (null == element) {
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
assert 0 == this.getInputPort().getPipe().size();
}
Thread.yield();
......@@ -26,7 +25,6 @@ public class Relay<T> extends AbstractStage<T, T> {
@Override
public void onIsPipelineHead() {
this.logger.debug("onIsPipelineHead");
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
}
......
......@@ -19,7 +19,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import teetime.util.HashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
......@@ -40,7 +39,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1;
private final Map<Long, TraceBuffer> traceId2trace = new HashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<Long, TraceBuffer> traceId2trace;
public TraceReconstructionFilter(final Map<Long, TraceBuffer> traceId2trace) {
super();
this.traceId2trace = traceId2trace;
}
@Override
protected void execute5(final IFlowRecord element) {
......
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* Buffer for similar traces that are to be aggregated into a single trace.
*
* @author Jan Waller, Florian Biss
*/
public final class TraceAggregationBuffer {
private final long bufferCreatedTimestamp;
private final TraceEventRecords aggregatedTrace;
private int countOfAggregatedTraces;
public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
this.aggregatedTrace = trace;
}
public void count() {
this.countOfAggregatedTraces++;
}
public long getBufferCreatedTimestamp() {
return this.bufferCreatedTimestamp;
}
public TraceEventRecords getTraceEventRecords() {
return this.aggregatedTrace;
}
public int getCount() {
return this.countOfAggregatedTraces;
}
}
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import java.io.Serializable;
import java.util.Comparator;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
/**
* @author Jan Waller, Florian Fittkau, Florian Biss
*/
public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable {
private static final long serialVersionUID = 8920766818232517L;
/**
* Creates a new instance of this class.
*/
public TraceComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
@Override
public int compare(final TraceEventRecords t1, final TraceEventRecords t2) {
final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents();
final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents();
if (recordsT1.length != recordsT2.length) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
for (int i = 0; i < recordsT1.length; i++) {
final AbstractTraceEvent recordT1 = recordsT1[i];
final AbstractTraceEvent recordT2 = recordsT2[i];
final int cmpClass = recordT1.getClass().getName()
.compareTo(recordT2.getClass().getName());
if (cmpClass != 0) {
return cmpClass;
}
if (recordT1 instanceof AbstractOperationEvent) {
final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature()
.compareTo(((AbstractOperationEvent) recordT2).getOperationSignature());
if (cmpSignature != 0) {
return cmpSignature;
}
}
if (recordT1 instanceof AfterOperationFailedEvent) {
final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo(
((AfterOperationFailedEvent) recordT2).getCause());
if (cmpError != 0) {
return cmpClass;
}
}
}
// All records match.
return 0;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* This filter collects incoming traces for a specified amount of time.
* Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace.
* Only one specimen of these traces containing this information will be forwarded from this filter.
*
* Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others.
*
* @author Jan Waller, Florian Biss
*
* @since
*/
public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, TraceEventRecords> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
private long maxCollectionDurationInNs;
public TraceReductionFilter(final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer) {
this.trace2buffer = trace2buffer;
}
@Override
protected void execute5(final TraceEventRecords traceEventRecords) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
synchronized (this) {
this.processTimeoutQueue(timestampInNs);
}
}
final long timestamp = System.nanoTime();
synchronized (this) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
}
traceBuffer.count();
}
}
@Override
public void onIsPipelineHead() {
synchronized (this) {
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
record.setCount(buffer.getCount());
this.send(record);
}
this.trace2buffer.clear();
}
super.onIsPipelineHead();
}
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
this.send(record);
}
iterator.remove();
}
}
public long getMaxCollectionDuration() {
return this.maxCollectionDurationInNs;
}
public void setMaxCollectionDuration(final long maxCollectionDuration) {
this.maxCollectionDurationInNs = maxCollectionDuration;
}
public InputPort<Long> getTriggerInputPort() {
return this.triggerInputPort;
}
}
......@@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
}
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -2,7 +2,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -30,10 +33,10 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
private Thread clock2Thread;
private Thread workerThread;
private CountingFilter<IMonitoringRecord> recordCounter;
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
......@@ -71,7 +74,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
......
......@@ -3,7 +3,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -35,11 +38,10 @@ public class TraceReconstructionAnalysis extends Analysis {
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter;
private File inputDir;
......@@ -73,7 +75,7 @@ public class TraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
......
......@@ -80,11 +80,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -5,7 +5,10 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -16,6 +19,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
......@@ -120,17 +124,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
}
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
......@@ -147,11 +153,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
// EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
......@@ -160,34 +166,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
// // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>();
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(sysout);
// pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
// pipeline.addIntermediateStage(traceReconstructionFilter);
// pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceThroughputFilter);
// pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
......@@ -246,10 +253,10 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return throughputs;
}
public List<Long> getTraceDelays() {
public List<Long> getTraceThroughputs() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getThroughputs());
}
return throughputs;
}
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads;
import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysisWith1Thread() {
this.performAnalysis(1);
}
@Test
public void performAnalysisWith2Threads() {
this.performAnalysis(2);
}
@Test
public void performAnalysisWith4Threads() {
this.performAnalysis(4);
}
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReductionAnalysisWithThreads analysis = new TcpTraceReductionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
}
public static void main(final String[] args) {
ChwWorkTcpTraceReductionAnalysisWithThreadsTest analysis = new ChwWorkTcpTraceReductionAnalysisWithThreadsTest();
analysis.before();
try {
analysis.performAnalysisWith1Thread();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
analysis.before();
try {
analysis.performAnalysisWith2Threads();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
analysis.before();
try {
analysis.performAnalysisWith4Threads();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
}
}
package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceComperator;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceReductionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReductionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread tcpThread;
private Thread clockThread;
private Thread clock2Thread;
private Thread[] workerThreads;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
private int numWorkerThreads;
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private static class StageFactory<T extends StageWithPort<?, ?>> {
private final Constructor<T> constructor;
private final List<T> stages = new ArrayList<T>();
public StageFactory(final Constructor<T> constructor) {
this.constructor = constructor;
}
public T create(final Object... initargs) {
try {
T stage = this.constructor.newInstance(initargs);
this.stages.add(stage);
return stage;
} catch (InstantiationException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
} catch (IllegalArgumentException e) {
throw new IllegalStateException(e);
} catch (InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
public List<T> getStages() {
return this.stages;
}
}
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
public TcpTraceReductionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clock2Stage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceReductionFilter);
pipeline.addIntermediateStage(traceThroughputFilter);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override
public void start() {
super.start();
this.tcpThread.start();
this.clockThread.start();
this.clock2Thread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
this.tcpThread.join();
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public int getNumRecords() {
int sum = 0;
for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public int getNumTraces() {
int sum = 0;
for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public List<Long> getRecordDelays() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
return throughputs;
}
public List<Long> getTraceThroughputs() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getThroughputs());
}
return throughputs;
}
public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
return this.tcpRelayPipe;
}
public int getNumWorkerThreads() {
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment