Skip to content
Snippets Groups Projects
Commit 8ba52389 authored by Christian Wulf's avatar Christian Wulf
Browse files

added performance test TcpTraceReconstructionAnalysisWithThreads

parent 029dc624
No related branches found
No related tags found
No related merge requests found
Showing
with 340 additions and 63 deletions
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.util;
import java.util.HashMap;
import teetime.util.concurrent.hashmap.ValueFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class HashMapWithDefault<K, V> extends HashMap<K, V> {
private static final long serialVersionUID = -7958038532219740472L;
private final ValueFactory<V> valueFactory;
/**
* @since 1.10
*/
public HashMapWithDefault(final ValueFactory<V> valueFactory) {
this.valueFactory = valueFactory;
}
/**
* @return the corresponding value if the key exists. Otherwise, it creates,
* inserts, and returns a new default value.
*/
@SuppressWarnings("unchecked")
@Override
public V get(final Object key) {
V value = super.get(key);
if (value == null) {
value = this.valueFactory.create();
super.put((K) key, value);
}
return value;
}
}
......@@ -7,23 +7,26 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class SpScPipe<T> extends AbstractPipe<T> {
private final FFBufferOrdered3<T> queue;
private int maxSize;
private SpScPipe(final int initialCapacity) {
this.queue = new FFBufferOrdered3<T>(initialCapacity);
private SpScPipe(final int capacity) {
this.queue = new FFBufferOrdered3<T>(capacity);
}
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int initialCapacity) {
IPipe<T> pipe = new SpScPipe<T>(initialCapacity);
public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe<T> pipe = new SpScPipe<T>(capacity);
targetPort.setPipe(pipe);
if (sourcePort != null) {
sourcePort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
return pipe;
}
@Override
public void add(final T element) {
this.queue.offer(element);
this.maxSize = Math.max(this.queue.size(), this.maxSize);
}
@Override
......@@ -46,4 +49,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
return this.queue.peek();
}
public int getMaxSize() {
return this.maxSize;
}
}
......@@ -38,7 +38,9 @@ public final class Distributor<T> extends AbstractStage<T, T> {
public void onIsPipelineHead() {
for (OutputPort<?> op : this.outputPorts) {
op.getPipe().close();
System.out.println("End signal sent, size: " + op.getPipe().size());
if (this.logger.isDebugEnabled()) {
this.logger.debug("End signal sent, size: " + op.getPipe().size());
}
}
// for (OutputPort<?> op : this.outputPorts) {
......
......@@ -16,6 +16,7 @@ public class Relay<T> extends AbstractStage<T, T> {
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
System.out.println("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
assert 0 == this.getInputPort().getPipe().size();
}
return;
}
......
......@@ -17,10 +17,9 @@ package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.HashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
......@@ -39,46 +38,22 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
private TimeUnit timeunit;
private long maxTraceDuration = Long.MAX_VALUE;
private long maxTraceTimeout = Long.MAX_VALUE;
private boolean timeout;
private long maxEncounteredLoggingTimestamp = -1;
private static final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private static final Map<Long, TraceBuffer> traceId2trace = new HashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
@Override
protected void execute5(final IFlowRecord element) {
final Long traceId = this.reconstructTrace(element);
if (traceId != null) {
this.putIfFinished(traceId);
this.processTimestamp(element);
}
}
private void processTimestamp(final IFlowRecord record) {
if (this.timeout) {
synchronized (this) {
final long loggingTimestamp = this.getTimestamp(record);
// can we assume a rough order of logging timestamps? (yes, except with DB reader)
if (loggingTimestamp > this.maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = loggingTimestamp;
}
this.processTimeoutQueue(this.maxEncounteredLoggingTimestamp);
}
}
}
private long getTimestamp(final IFlowRecord record) {
if (record instanceof AbstractTraceEvent) {
return ((AbstractTraceEvent) record).getTimestamp();
}
return -1;
}
private void putIfFinished(final Long traceId) {
final TraceBuffer traceBuffer = TraceReconstructionFilter.traceId2trace.get(traceId);
if (traceBuffer.isFinished()) {
synchronized (this) { // has to be synchronized because of timeout cleanup
TraceReconstructionFilter.traceId2trace.remove(traceId);
}
TraceReconstructionFilter.traceId2trace.remove(traceId);
this.put(traceBuffer);
}
}
......@@ -100,12 +75,6 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
return traceId;
}
@Override
public void onStart() {
this.timeout = !((this.maxTraceTimeout == Long.MAX_VALUE) && (this.maxTraceDuration == Long.MAX_VALUE));
super.onStart();
}
@Override
public void onIsPipelineHead() {
Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator();
......@@ -118,20 +87,6 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
super.onIsPipelineHead();
}
private void processTimeoutQueue(final long timestamp) {
final long duration = timestamp - this.maxTraceDuration;
final long traceTimeout = timestamp - this.maxTraceTimeout;
for (final Iterator<Entry<Long, TraceBuffer>> iterator = TraceReconstructionFilter.traceId2trace.entrySet().iterator(); iterator.hasNext();) {
final TraceBuffer traceBuffer = iterator.next().getValue();
if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long time no see
|| (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max duration is gone
this.put(traceBuffer);
iterator.remove();
}
}
}
private void put(final TraceBuffer traceBuffer) {
// final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort =
// (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort;
......
......@@ -60,8 +60,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
analysis.onTerminate();
}
assertEquals(21001, analysis.getNumRecords());
assertEquals(1000, analysis.getNumTraces());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
......@@ -69,8 +72,8 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
}
}
......@@ -15,7 +15,6 @@ import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
......@@ -30,8 +29,6 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
private Thread clock2Thread;
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
......@@ -67,8 +64,6 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
}
private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
TCPReader tcpReader = new TCPReader();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
......@@ -139,7 +134,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
return this.recordThroughputFilter.getThroughputs();
}
public List<Long> getTraceThroughputFilter() {
public List<Long> getTraceThroughputs() {
return this.traceThroughputFilter.getThroughputs();
}
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
System.out.println("Max size of pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
}
}
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int TCP_RELAY_MAX_SIZE = 500000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread tcpThread;
private Thread clockThread;
private Thread clock2Thread;
private Thread workerThread;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
return clock;
}
private StageWithPort<Void, Long> buildClock2Pipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(2000);
return clock;
}
private Pipeline<IMonitoringRecord, TraceEventRecords> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1);
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
}
@Override
public void start() {
super.start();
this.tcpThread.start();
// this.clockThread.start();
this.clock2Thread.start();
this.workerThread.start();
try {
this.tcpThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
try {
this.workerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
}
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputFilter.getThroughputs();
}
public List<Long> getTraceThroughputs() {
return this.traceThroughputFilter.getThroughputs();
}
public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
return this.tcpRelayPipe;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment