diff --git a/conf/logging.properties b/conf/logging.properties index 1fbc6e666e68a074a35511fe5bcb2a9820e018ca..0ec16130fc2101bd1391db7f85b4edf44b27cc29 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n #teetime.level = ALL teetime.variant.methodcallWithPorts.framework.core.level = ALL -teetime.variant.methodcallWithPorts.stage.level = WARNING +teetime.variant.methodcallWithPorts.stage.level = FINE teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/scripts/MooBench-cmd/startMooBench.sh b/scripts/MooBench-cmd/startMooBench.sh index d20525f415c722057e2e6926fe901666c5c0286d..2d16af84c7c0b9e558936c29e208f3e39448d0c5 100644 --- a/scripts/MooBench-cmd/startMooBench.sh +++ b/scripts/MooBench-cmd/startMooBench.sh @@ -4,8 +4,8 @@ java=~/jdk1.7.0_60/bin/java cp=.:MooBench.jar:META-INF/kieker.monitoring.properties:META-INF/kieker.logging.properties jvmParams="-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter" params="-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q" -runs=$1 +#runs=$1 -for i in {1..${runs}}; do +for i in {1..3}; do ${java} -cp ${cp} ${jvmParams} mooBench.benchmark.Benchmark ${params}; done diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index 30b5f64a500ce82817389a87c12aff6494a0f4fa..3f5c82aca9bf68232b6737c18211918f43f204d6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; @@ -17,12 +18,13 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { @Override protected void execute5(final T element) { + this.numPassedElements++; + this.send(element); + Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { this.computeElementThroughput(System.nanoTime()); } - this.numPassedElements++; - this.send(element); } @Override @@ -33,10 +35,11 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { private void computeElementThroughput(final Long timestampInNs) { long diffInNs = timestampInNs - this.lastTimestampInNs; - if (diffInNs > 0) { - long throughputInNsPerElement = this.numPassedElements / diffInNs; - this.throughputs.add(throughputInNsPerElement); - this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit"); + long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); + if (diffInSec > 0) { + long throughputPerSec = this.numPassedElements / diffInSec; + this.throughputs.add(throughputPerSec); + this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements); this.resetTimestamp(timestampInNs); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index b0a6325c381c44724c67cf6db98f64de2d88afbc..26cb2acd8960a9a09defba578749bc139cf44184 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -15,7 +15,6 @@ public class Relay<T> extends AbstractStage<T, T> { if (null == element) { if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); - this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size()); assert 0 == this.getInputPort().getPipe().size(); } Thread.yield(); @@ -26,7 +25,6 @@ public class Relay<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - this.logger.debug("onIsPipelineHead"); if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index b2516a07b7e8f1a28981d9172ed0e9531ecb69d2..a998feb81072e36e2763945aabc1c6ada27b7bf5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; -import teetime.util.HashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; @@ -40,7 +39,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE private long maxTraceTimeout = Long.MAX_VALUE; private long maxEncounteredLoggingTimestamp = -1; - private final Map<Long, TraceBuffer> traceId2trace = new HashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final Map<Long, TraceBuffer> traceId2trace; + + public TraceReconstructionFilter(final Map<Long, TraceBuffer> traceId2trace) { + super(); + this.traceId2trace = traceId2trace; + } @Override protected void execute5(final IFlowRecord element) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceAggregationBuffer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceAggregationBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..80dca6106d60069137cc27e314daa8131f405e7c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceAggregationBuffer.java @@ -0,0 +1,36 @@ +package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; + +/** + * Buffer for similar traces that are to be aggregated into a single trace. + * + * @author Jan Waller, Florian Biss + */ +public final class TraceAggregationBuffer { + private final long bufferCreatedTimestamp; + private final TraceEventRecords aggregatedTrace; + + private int countOfAggregatedTraces; + + public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) { + this.bufferCreatedTimestamp = bufferCreatedTimestamp; + this.aggregatedTrace = trace; + } + + public void count() { + this.countOfAggregatedTraces++; + } + + public long getBufferCreatedTimestamp() { + return this.bufferCreatedTimestamp; + } + + public TraceEventRecords getTraceEventRecords() { + return this.aggregatedTrace; + } + + public int getCount() { + return this.countOfAggregatedTraces; + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceComperator.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceComperator.java new file mode 100644 index 0000000000000000000000000000000000000000..d1a5b770fd75df3ae469ec70afd314727566963b --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceComperator.java @@ -0,0 +1,69 @@ +package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction; + +import java.io.Serializable; +import java.util.Comparator; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; +import kieker.common.record.flow.trace.AbstractTraceEvent; +import kieker.common.record.flow.trace.operation.AbstractOperationEvent; +import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent; + +/** + * @author Jan Waller, Florian Fittkau, Florian Biss + */ +public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable { + private static final long serialVersionUID = 8920766818232517L; + + /** + * Creates a new instance of this class. + */ + public TraceComperator() { + // default empty constructor + } + + /** + * {@inheritDoc} + */ + @Override + public int compare(final TraceEventRecords t1, final TraceEventRecords t2) { + final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents(); + final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents(); + + if (recordsT1.length != recordsT2.length) { + return recordsT1.length - recordsT2.length; + } + + final int cmpHostnames = t1.getTraceMetadata().getHostname() + .compareTo(t2.getTraceMetadata().getHostname()); + if (cmpHostnames != 0) { + return cmpHostnames; + } + + for (int i = 0; i < recordsT1.length; i++) { + final AbstractTraceEvent recordT1 = recordsT1[i]; + final AbstractTraceEvent recordT2 = recordsT2[i]; + + final int cmpClass = recordT1.getClass().getName() + .compareTo(recordT2.getClass().getName()); + if (cmpClass != 0) { + return cmpClass; + } + if (recordT1 instanceof AbstractOperationEvent) { + final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature() + .compareTo(((AbstractOperationEvent) recordT2).getOperationSignature()); + if (cmpSignature != 0) { + return cmpSignature; + } + } + if (recordT1 instanceof AfterOperationFailedEvent) { + final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo( + ((AfterOperationFailedEvent) recordT2).getCause()); + if (cmpError != 0) { + return cmpClass; + } + } + } + // All records match. + return 0; + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..931a4e4f735657d88780932f0c7c742dd9421631 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java @@ -0,0 +1,112 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; + +/** + * This filter collects incoming traces for a specified amount of time. + * Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace. + * Only one specimen of these traces containing this information will be forwarded from this filter. + * + * Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others. + * + * @author Jan Waller, Florian Biss + * + * @since + */ +public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, TraceEventRecords> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer; + + private long maxCollectionDurationInNs; + + public TraceReductionFilter(final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer) { + this.trace2buffer = trace2buffer; + } + + @Override + protected void execute5(final TraceEventRecords traceEventRecords) { + Long timestampInNs = this.triggerInputPort.receive(); + if (timestampInNs != null) { + synchronized (this) { + this.processTimeoutQueue(timestampInNs); + } + } + + final long timestamp = System.nanoTime(); + synchronized (this) { + TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords); + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords); + this.trace2buffer.put(traceEventRecords, traceBuffer); + } + traceBuffer.count(); + } + } + + @Override + public void onIsPipelineHead() { + synchronized (this) { + for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { + final TraceAggregationBuffer buffer = entry.getValue(); + final TraceEventRecords record = buffer.getTraceEventRecords(); + record.setCount(buffer.getCount()); + this.send(record); + } + this.trace2buffer.clear(); + } + + super.onIsPipelineHead(); + } + + private void processTimeoutQueue(final long timestampInNs) { + final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs; + for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) { + final TraceAggregationBuffer traceBuffer = iterator.next().getValue(); + // this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs + // + " (bufferTimeoutInNs)"); + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { + final TraceEventRecords record = traceBuffer.getTraceEventRecords(); + record.setCount(traceBuffer.getCount()); + this.send(record); + } + iterator.remove(); + } + } + + public long getMaxCollectionDuration() { + return this.maxCollectionDurationInNs; + } + + public void setMaxCollectionDuration(final long maxCollectionDuration) { + this.maxCollectionDurationInNs = maxCollectionDuration; + } + + public InputPort<Long> getTriggerInputPort() { + return this.triggerInputPort; + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 41729c527f3a7420e73ebee295bb169fef9266f3..040cacb99fde3a3efbe9c4d3ce646ee0a1621b30 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { } Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 11beed81bd4adbb1b8c8a0582fcadb5840d74cc9..141ed49ec0c6ecbd8961f2a633481dcde2b1fd6d 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -2,7 +2,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; +import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; @@ -30,10 +33,10 @@ public class TcpTraceReconstructionAnalysis extends Analysis { private Thread clock2Thread; private Thread workerThread; - private CountingFilter<IMonitoringRecord> recordCounter; + private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private CountingFilter<IMonitoringRecord> recordCounter; private CountingFilter<TraceEventRecords> traceCounter; - private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; @@ -71,7 +74,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); - final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); + final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 7b02a5d9f79eda21ae58b191710683413b4ca043..8a13ff2d07b26a7468584b8459cc2c62edcb7f88 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -3,7 +3,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction; import java.io.File; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; +import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; @@ -35,11 +38,10 @@ public class TraceReconstructionAnalysis extends Analysis { private Thread workerThread; private ClassNameRegistryRepository classNameRegistryRepository; + private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private CountingFilter<IMonitoringRecord> recordCounter; - private CountingFilter<TraceEventRecords> traceCounter; - private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; private File inputDir; @@ -73,7 +75,7 @@ public class TraceReconstructionAnalysis extends Analysis { final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); - final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); + final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); this.traceCounter = new CountingFilter<TraceEventRecords>(); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index ed63b1a26d34693342c61736d4ef2156f22cc371..668e263cd41654fd1501f8ceb01e59bb7618d2f3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -80,11 +80,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); - Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); - System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); + // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); + // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); - Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); - System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); + // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); + // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); + System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index cc36887cbe715ed6b49f34463f5c3531d231a72d..2c13357ceef5a9a7011d448994767da044c905d6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -5,7 +5,10 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; +import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; @@ -16,6 +19,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.Distributor; import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; @@ -120,17 +124,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } } + private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; - private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; public TcpTraceReconstructionAnalysisWithThreads() { try { this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); - this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); } catch (NoSuchMethodException e) { throw new IllegalArgumentException(e); } catch (SecurityException e) { @@ -147,11 +153,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); - final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); + final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); - ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); - // EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); - EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); + ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); + EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); @@ -160,34 +166,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); // // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); - SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); - SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); + // SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort()); // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - // // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); // SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); - SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); - // SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + // SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); - Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>(); + Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(recordThroughputFilter); + // pipeline.addIntermediateStage(recordThroughputFilter); // pipeline.addIntermediateStage(sysout); - // pipeline.addIntermediateStage(instanceOfFilter); + pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); - // pipeline.addIntermediateStage(traceReconstructionFilter); - // pipeline.addIntermediateStage(this.traceThroughputFilter); + pipeline.addIntermediateStage(traceReconstructionFilter); + pipeline.addIntermediateStage(traceThroughputFilter); // pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; @@ -246,10 +253,10 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return throughputs; } - public List<Long> getTraceDelays() { + public List<Long> getTraceThroughputs() { List<Long> throughputs = new LinkedList<Long>(); - for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { - throughputs.addAll(stage.getDelays()); + for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getThroughputs()); } return throughputs; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..19c277b88055e601d097ed88af4c3623a54ebe1f --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java @@ -0,0 +1,131 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest { + + private StopWatch stopWatch; + + @Before + public void before() { + this.stopWatch = new StopWatch(); + } + + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + + @Test + public void performAnalysisWith1Thread() { + this.performAnalysis(1); + } + + @Test + public void performAnalysisWith2Threads() { + this.performAnalysis(2); + } + + @Test + public void performAnalysisWith4Threads() { + this.performAnalysis(4); + } + + void performAnalysis(final int numWorkerThreads) { + final TcpTraceReductionAnalysisWithThreads analysis = new TcpTraceReductionAnalysisWithThreads(); + analysis.setNumWorkerThreads(numWorkerThreads); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + + // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); + // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); + + // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); + // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); + System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); + + // assertEquals(1000, analysis.getNumTraces()); + assertEquals(1000000, analysis.getNumTraces()); + + // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); + // + // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + // assertEquals(21001, analysis.getNumRecords()); + assertEquals(21000001, analysis.getNumRecords()); + } + + public static void main(final String[] args) { + ChwWorkTcpTraceReductionAnalysisWithThreadsTest analysis = new ChwWorkTcpTraceReductionAnalysisWithThreadsTest(); + analysis.before(); + try { + analysis.performAnalysisWith1Thread(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + + analysis.before(); + try { + analysis.performAnalysisWith2Threads(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + + analysis.before(); + try { + analysis.performAnalysisWith4Threads(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java new file mode 100644 index 0000000000000000000000000000000000000000..e916bf9e5cd083e04ac2f475a35d3f554f29cc8a --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -0,0 +1,267 @@ +package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; +import teetime.util.concurrent.hashmap.TraceBuffer; +import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.stage.Clock; +import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Distributor; +import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; +import teetime.variant.methodcallWithPorts.stage.EndStage; +import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; +import teetime.variant.methodcallWithPorts.stage.Relay; +import teetime.variant.methodcallWithPorts.stage.io.TCPReader; +import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; +import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer; +import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceComperator; +import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceReductionFilter; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; + +public class TcpTraceReductionAnalysisWithThreads extends Analysis { + + private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); + private static final int TCP_RELAY_MAX_SIZE = 500000; + + private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); + + private Thread tcpThread; + private Thread clockThread; + private Thread clock2Thread; + private Thread[] workerThreads; + + private SpScPipe<IMonitoringRecord> tcpRelayPipe; + private int numWorkerThreads; + + @Override + public void init() { + super.init(); + StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); + this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); + this.clockThread = new Thread(new RunnableStage(clockStage)); + + StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000); + this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); + this.workerThreads = new Thread[this.numWorkerThreads]; + + for (int i = 0; i < this.workerThreads.length; i++) { + StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + } + } + + private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { + TCPReader tcpReader = new TCPReader(); + Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); + + SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); + + // create and configure pipeline + Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); + pipeline.setFirstStage(tcpReader); + pipeline.setLastStage(distributor); + return pipeline; + } + + private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { + Clock clock = new Clock(); + clock.setInitialDelayInMs(intervalDelayInMs); + clock.setIntervalDelayInMs(intervalDelayInMs); + Distributor<Long> distributor = new Distributor<Long>(); + + SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); + + // create and configure pipeline + Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(distributor); + return pipeline; + } + + private static class StageFactory<T extends StageWithPort<?, ?>> { + + private final Constructor<T> constructor; + private final List<T> stages = new ArrayList<T>(); + + public StageFactory(final Constructor<T> constructor) { + this.constructor = constructor; + } + + public T create(final Object... initargs) { + try { + T stage = this.constructor.newInstance(initargs); + this.stages.add(stage); + return stage; + } catch (InstantiationException e) { + throw new IllegalStateException(e); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } catch (IllegalArgumentException e) { + throw new IllegalStateException(e); + } catch (InvocationTargetException e) { + throw new IllegalStateException(e); + } + } + + public List<T> getStages() { + return this.stages; + } + } + + private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); + + private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + + public TcpTraceReductionAnalysisWithThreads() { + try { + this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } catch (SecurityException e) { + throw new IllegalArgumentException(e); + } + } + + private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, + final StageWithPort<Void, Long> clockStage, + final StageWithPort<Void, Long> clock2Stage) { + // create stages + Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); + CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); + final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( + IFlowRecord.class); + ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); + final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); + TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer); + CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); + EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + + // connect stages + this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + + SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort()); + SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + + // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + + SpScPipe.connect(clock2Stage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + + // create and configure pipeline + Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + pipeline.setFirstStage(relay); + pipeline.addIntermediateStage(instanceOfFilter); + pipeline.addIntermediateStage(traceReconstructionFilter); + pipeline.addIntermediateStage(traceReductionFilter); + pipeline.addIntermediateStage(traceThroughputFilter); + pipeline.setLastStage(endStage); + return pipeline; + } + + @Override + public void start() { + super.start(); + + this.tcpThread.start(); + this.clockThread.start(); + this.clock2Thread.start(); + + for (Thread workerThread : this.workerThreads) { + workerThread.start(); + } + + try { + this.tcpThread.join(); + + for (Thread workerThread : this.workerThreads) { + workerThread.join(); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + this.clockThread.interrupt(); + this.clock2Thread.interrupt(); + } + + public List<TraceEventRecords> getElementCollection() { + return this.elementCollection; + } + + public int getNumRecords() { + int sum = 0; + for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; + } + + public int getNumTraces() { + int sum = 0; + for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; + } + + public List<Long> getRecordDelays() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getDelays()); + } + return throughputs; + } + + public List<Long> getTraceThroughputs() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getThroughputs()); + } + return throughputs; + } + + public SpScPipe<IMonitoringRecord> getTcpRelayPipe() { + return this.tcpRelayPipe; + } + + public int getNumWorkerThreads() { + return this.numWorkerThreads; + } + + public void setNumWorkerThreads(final int numWorkerThreads) { + this.numWorkerThreads = numWorkerThreads; + } + +}