From 9a2e265cb374dd095529eb65f49401330ab89d2d Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 30 Jun 2014 17:40:13 +0200 Subject: [PATCH] added delay and throughput measuring stages --- conf/logging.properties | 4 +- .../framework/core/Pipeline.java | 4 - .../methodcallWithPorts/stage/Clock.java | 2 +- .../stage/ElementDelayMeasuringStage.java | 58 +++++++++ ...a => ElementThroughputMeasuringStage.java} | 30 +++-- ...hwHomeTraceReconstructionAnalysisTest.java | 6 +- ...orkTcpTraceReconstructionAnalysisTest.java | 2 +- ...hwWorkTraceReconstructionAnalysisTest.java | 6 +- ...KiekerTraceReconstructionAnalysisTest.java | 6 +- .../TcpTraceReconstructionAnalysis.java | 10 +- .../TraceReconstructionAnalysis.java | 6 +- ...ReconstructionAnalysisWithThreadsTest.java | 7 +- ...raceReconstructionAnalysisWithThreads.java | 111 ++++++++++++++---- 13 files changed, 185 insertions(+), 67 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java rename src/main/java/teetime/variant/methodcallWithPorts/stage/{ThroughputFilter.java => ElementThroughputMeasuringStage.java} (54%) diff --git a/conf/logging.properties b/conf/logging.properties index 19edacb8..1fbc6e66 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n #teetime.level = ALL teetime.variant.methodcallWithPorts.framework.core.level = ALL -#teetime.variant.methodcallWithPorts.stage.level = ALL -teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE \ No newline at end of file +teetime.variant.methodcallWithPorts.stage.level = WARNING +teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 25667aeb..4d066d56 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -81,8 +81,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public void executeWithPorts() { - this.logger.debug("Executing stage..."); - // StageWithPort<?, ?> headStage = this.currentHeads.next(); StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; @@ -187,8 +185,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.firstStage = null; this.intermediateStages.clear(); this.lastStage = null; - - System.out.println("cleaned up"); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index d5a462ef..51658b1e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -25,7 +25,7 @@ public class Clock extends ProducerStage<Void, Long> { this.sleep(this.intervalDelayInMs); } - this.logger.debug("Emitting timestamp"); + // this.logger.debug("Emitting timestamp"); this.send(this.getCurrentTimeInNs()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java new file mode 100644 index 00000000..771cde6a --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java @@ -0,0 +1,58 @@ +package teetime.variant.methodcallWithPorts.stage; + +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; + +public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private long numPassedElements; + private long lastTimestampInNs; + + private final List<Long> delays = new LinkedList<Long>(); + + @Override + protected void execute5(final T element) { + Long timestampInNs = this.triggerInputPort.receive(); + if (timestampInNs != null) { + this.computeElementDelay(System.nanoTime()); + } + this.numPassedElements++; + this.send(element); + } + + @Override + public void onStart() { + this.resetTimestamp(System.nanoTime()); + super.onStart(); + } + + private void computeElementDelay(final Long timestampInNs) { + long diffInNs = timestampInNs - this.lastTimestampInNs; + if (this.numPassedElements > 0) { + long delayInNsPerElement = diffInNs / this.numPassedElements; + this.delays.add(delayInNsPerElement); + this.logger.info("Delay: " + delayInNsPerElement + " time units/element"); + + this.resetTimestamp(timestampInNs); + } + } + + private void resetTimestamp(final Long timestampInNs) { + this.numPassedElements = 0; + this.lastTimestampInNs = timestampInNs; + } + + public List<Long> getDelays() { + return this.delays; + } + + public InputPort<Long> getTriggerInputPort() { + return this.triggerInputPort; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java similarity index 54% rename from src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index c15f8610..30b5f64a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -2,17 +2,16 @@ package teetime.variant.methodcallWithPorts.stage; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; -public class ThroughputFilter<T> extends ConsumerStage<T, T> { +public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); private long numPassedElements; - private long timestamp; + private long lastTimestampInNs; private final List<Long> throughputs = new LinkedList<Long>(); @@ -20,8 +19,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { protected void execute5(final T element) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { - this.computeThroughput(); - this.resetTimestamp(); + this.computeElementThroughput(System.nanoTime()); } this.numPassedElements++; this.send(element); @@ -29,24 +27,24 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { @Override public void onStart() { - this.resetTimestamp(); + this.resetTimestamp(System.nanoTime()); super.onStart(); } - private void computeThroughput() { - long diffInNs = System.nanoTime() - this.timestamp; - long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); - long throughputPerMs = this.numPassedElements / diffInMs; - this.throughputs.add(throughputPerMs); - // this.logger.info("Throughput: " + throughputPerMs + " elements/ms"); + private void computeElementThroughput(final Long timestampInNs) { + long diffInNs = timestampInNs - this.lastTimestampInNs; + if (diffInNs > 0) { + long throughputInNsPerElement = this.numPassedElements / diffInNs; + this.throughputs.add(throughputInNsPerElement); + this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit"); - // long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); - // long throughputPerSec = this.numPassedElements / diffInSec; + this.resetTimestamp(timestampInNs); + } } - private void resetTimestamp() { + private void resetTimestamp(final Long timestampInNs) { this.numPassedElements = 0; - this.timestamp = System.nanoTime(); + this.lastTimestampInNs = timestampInNs; } public List<Long> getThroughputs() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 1fddc4cc..21df773f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l)))); } @@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 316af869..41729c52 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { } Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 87702a47..82f3e628 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java index 4a9f9fc8..d50794c3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java @@ -76,7 +76,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -102,7 +102,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -130,7 +130,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 544ac57b..11beed81 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.Distributor; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -34,8 +34,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis { private CountingFilter<TraceEventRecords> traceCounter; - private ThroughputFilter<IFlowRecord> recordThroughputFilter; - private ThroughputFilter<TraceEventRecords> traceThroughputFilter; + private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; + private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; @Override public void init() { @@ -70,9 +70,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { this.recordCounter = new CountingFilter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>(); + this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); - this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); + this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 071a7aa0..7b02a5d9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.stage.Cache; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -40,7 +40,7 @@ public class TraceReconstructionAnalysis extends Analysis { private CountingFilter<TraceEventRecords> traceCounter; - private ThroughputFilter<IFlowRecord> throughputFilter; + private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; private File inputDir; @@ -72,7 +72,7 @@ public class TraceReconstructionAnalysis extends Analysis { final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.throughputFilter = new ThroughputFilter<IFlowRecord>(); + this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); this.traceCounter = new CountingFilter<TraceEventRecords>(); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 2353d523..ed63b1a2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -80,8 +80,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); + System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); + + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); + System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 8cea1e90..cc36887c 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -1,5 +1,8 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -12,10 +15,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.Distributor; +import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -35,13 +38,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private Thread clock2Thread; private Thread[] workerThreads; - private CountingFilter<IMonitoringRecord> recordCounter; - - private CountingFilter<TraceEventRecords> traceCounter; - - private ThroughputFilter<IFlowRecord> recordThroughputFilter; - private ThroughputFilter<TraceEventRecords> traceThroughputFilter; - private SpScPipe<IMonitoringRecord> tcpRelayPipe; private int numWorkerThreads; @@ -81,6 +77,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); + clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); @@ -93,18 +90,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return pipeline; } + private static class StageFactory<T extends StageWithPort<?, ?>> { + + private final Constructor<T> constructor; + private final List<T> stages = new ArrayList<T>(); + + public StageFactory(final Constructor<T> constructor) { + this.constructor = constructor; + } + + public T create(final Object... initargs) { + try { + T stage = this.constructor.newInstance(initargs); + this.stages.add(stage); + return stage; + } catch (InstantiationException e) { + throw new IllegalStateException(e); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } catch (IllegalArgumentException e) { + throw new IllegalStateException(e); + } catch (InvocationTargetException e) { + throw new IllegalStateException(e); + } + } + + public List<T> getStages() { + return this.stages; + } + } + + private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + + public TcpTraceReconstructionAnalysisWithThreads() { + try { + this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } catch (SecurityException e) { + throw new IllegalArgumentException(e); + } + } + private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); - this.recordCounter = new CountingFilter<IMonitoringRecord>(); + CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>(); + ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); - this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); - this.traceCounter = new CountingFilter<TraceEventRecords>(); + CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); // EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); @@ -118,7 +163,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); - SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); + SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort()); + // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); @@ -127,15 +174,15 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); - SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); - // SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + // SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + // SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>(); pipeline.setFirstStage(relay); - // pipeline.addIntermediateStage(this.recordCounter); + pipeline.addIntermediateStage(recordThroughputFilter); // pipeline.addIntermediateStage(sysout); // pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); @@ -151,7 +198,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { super.start(); this.tcpThread.start(); - // this.clockThread.start(); + this.clockThread.start(); // this.clock2Thread.start(); for (Thread workerThread : this.workerThreads) { @@ -176,19 +223,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } public int getNumRecords() { - return this.recordCounter.getNumElementsPassed(); + int sum = 0; + for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; } public int getNumTraces() { - return this.traceCounter.getNumElementsPassed(); + int sum = 0; + for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; } - public List<Long> getRecordThroughputs() { - return this.recordThroughputFilter.getThroughputs(); + public List<Long> getRecordDelays() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getDelays()); + } + return throughputs; } - public List<Long> getTraceThroughputs() { - return this.traceThroughputFilter.getThroughputs(); + public List<Long> getTraceDelays() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getDelays()); + } + return throughputs; } public SpScPipe<IMonitoringRecord> getTcpRelayPipe() { -- GitLab