diff --git a/conf/logging.properties b/conf/logging.properties index 19edacb8c3100024c710d0c5fd1abe4103af6f16..1fbc6e666e68a074a35511fe5bcb2a9820e018ca 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n #teetime.level = ALL teetime.variant.methodcallWithPorts.framework.core.level = ALL -#teetime.variant.methodcallWithPorts.stage.level = ALL -teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE \ No newline at end of file +teetime.variant.methodcallWithPorts.stage.level = WARNING +teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 25667aebb153a7d1f5dbdbc50bf9979fd0f87599..4d066d56fe605d3bb40e3fc457c6284ae79a7cf0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -81,8 +81,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public void executeWithPorts() { - this.logger.debug("Executing stage..."); - // StageWithPort<?, ?> headStage = this.currentHeads.next(); StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; @@ -187,8 +185,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.firstStage = null; this.intermediateStages.clear(); this.lastStage = null; - - System.out.println("cleaned up"); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index d5a462efc9933d8eff88a7e389739c086a89c447..51658b1efad629c9839069490f482c20fe47aa22 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -25,7 +25,7 @@ public class Clock extends ProducerStage<Void, Long> { this.sleep(this.intervalDelayInMs); } - this.logger.debug("Emitting timestamp"); + // this.logger.debug("Emitting timestamp"); this.send(this.getCurrentTimeInNs()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java new file mode 100644 index 0000000000000000000000000000000000000000..771cde6a08ebaac1bcd8aa43ba63e1d1b3160b47 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java @@ -0,0 +1,58 @@ +package teetime.variant.methodcallWithPorts.stage; + +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; + +public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private long numPassedElements; + private long lastTimestampInNs; + + private final List<Long> delays = new LinkedList<Long>(); + + @Override + protected void execute5(final T element) { + Long timestampInNs = this.triggerInputPort.receive(); + if (timestampInNs != null) { + this.computeElementDelay(System.nanoTime()); + } + this.numPassedElements++; + this.send(element); + } + + @Override + public void onStart() { + this.resetTimestamp(System.nanoTime()); + super.onStart(); + } + + private void computeElementDelay(final Long timestampInNs) { + long diffInNs = timestampInNs - this.lastTimestampInNs; + if (this.numPassedElements > 0) { + long delayInNsPerElement = diffInNs / this.numPassedElements; + this.delays.add(delayInNsPerElement); + this.logger.info("Delay: " + delayInNsPerElement + " time units/element"); + + this.resetTimestamp(timestampInNs); + } + } + + private void resetTimestamp(final Long timestampInNs) { + this.numPassedElements = 0; + this.lastTimestampInNs = timestampInNs; + } + + public List<Long> getDelays() { + return this.delays; + } + + public InputPort<Long> getTriggerInputPort() { + return this.triggerInputPort; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java similarity index 54% rename from src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index c15f8610445423c201b5607bba6bdf5b4f0526fd..30b5f64a500ce82817389a87c12aff6494a0f4fa 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -2,17 +2,16 @@ package teetime.variant.methodcallWithPorts.stage; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; -public class ThroughputFilter<T> extends ConsumerStage<T, T> { +public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); private long numPassedElements; - private long timestamp; + private long lastTimestampInNs; private final List<Long> throughputs = new LinkedList<Long>(); @@ -20,8 +19,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { protected void execute5(final T element) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { - this.computeThroughput(); - this.resetTimestamp(); + this.computeElementThroughput(System.nanoTime()); } this.numPassedElements++; this.send(element); @@ -29,24 +27,24 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { @Override public void onStart() { - this.resetTimestamp(); + this.resetTimestamp(System.nanoTime()); super.onStart(); } - private void computeThroughput() { - long diffInNs = System.nanoTime() - this.timestamp; - long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); - long throughputPerMs = this.numPassedElements / diffInMs; - this.throughputs.add(throughputPerMs); - // this.logger.info("Throughput: " + throughputPerMs + " elements/ms"); + private void computeElementThroughput(final Long timestampInNs) { + long diffInNs = timestampInNs - this.lastTimestampInNs; + if (diffInNs > 0) { + long throughputInNsPerElement = this.numPassedElements / diffInNs; + this.throughputs.add(throughputInNsPerElement); + this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit"); - // long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); - // long throughputPerSec = this.numPassedElements / diffInSec; + this.resetTimestamp(timestampInNs); + } } - private void resetTimestamp() { + private void resetTimestamp(final Long timestampInNs) { this.numPassedElements = 0; - this.timestamp = System.nanoTime(); + this.lastTimestampInNs = timestampInNs; } public List<Long> getThroughputs() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 1fddc4ccb73fb648e65c4fa81c264c1a7b36e9e3..21df773f8a4b60c87ed328f7155162fc4eb1607b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l)))); } @@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 316af86901cf02b35615b7be7a78db5774c8b06a..41729c527f3a7420e73ebee295bb169fef9266f3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { } Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 87702a47a98969ad255585461ba15ee0584ca645..82f3e6283b857c11f3f2334897575d974e57c8b5 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java index 4a9f9fc82ce80d4573dc405454c84f16f9316b48..d50794c36658d37072ccc24e0cb236768a7af2fc 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java @@ -76,7 +76,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); } @Test @@ -102,7 +102,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -130,7 +130,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 544ac57b6a8c44d95211794b8d5d543a21938ca7..11beed81bd4adbb1b8c8a0582fcadb5840d74cc9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.Distributor; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -34,8 +34,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis { private CountingFilter<TraceEventRecords> traceCounter; - private ThroughputFilter<IFlowRecord> recordThroughputFilter; - private ThroughputFilter<TraceEventRecords> traceThroughputFilter; + private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; + private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; @Override public void init() { @@ -70,9 +70,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { this.recordCounter = new CountingFilter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>(); + this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); - this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); + this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 071a7aa07daafd9256a09dc11f6eebcde9475008..7b02a5d9f79eda21ae58b191710683413b4ca043 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.stage.Cache; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -40,7 +40,7 @@ public class TraceReconstructionAnalysis extends Analysis { private CountingFilter<TraceEventRecords> traceCounter; - private ThroughputFilter<IFlowRecord> throughputFilter; + private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; private File inputDir; @@ -72,7 +72,7 @@ public class TraceReconstructionAnalysis extends Analysis { final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.throughputFilter = new ThroughputFilter<IFlowRecord>(); + this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); this.traceCounter = new CountingFilter<TraceEventRecords>(); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 2353d5232014ce3f7383222697131d3e317bdaeb..ed63b1a26d34693342c61736d4ef2156f22cc371 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -80,8 +80,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); + System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); + + Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); + System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); // assertEquals(1000, analysis.getNumTraces()); assertEquals(1000000, analysis.getNumTraces()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 8cea1e903676ad46f54b82207829791dd5eb5fbc..cc36887cbe715ed6b49f34463f5c3531d231a72d 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -1,5 +1,8 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -12,10 +15,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.Distributor; +import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.Relay; -import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -35,13 +38,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private Thread clock2Thread; private Thread[] workerThreads; - private CountingFilter<IMonitoringRecord> recordCounter; - - private CountingFilter<TraceEventRecords> traceCounter; - - private ThroughputFilter<IFlowRecord> recordThroughputFilter; - private ThroughputFilter<TraceEventRecords> traceThroughputFilter; - private SpScPipe<IMonitoringRecord> tcpRelayPipe; private int numWorkerThreads; @@ -81,6 +77,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); + clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); @@ -93,18 +90,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return pipeline; } + private static class StageFactory<T extends StageWithPort<?, ?>> { + + private final Constructor<T> constructor; + private final List<T> stages = new ArrayList<T>(); + + public StageFactory(final Constructor<T> constructor) { + this.constructor = constructor; + } + + public T create(final Object... initargs) { + try { + T stage = this.constructor.newInstance(initargs); + this.stages.add(stage); + return stage; + } catch (InstantiationException e) { + throw new IllegalStateException(e); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } catch (IllegalArgumentException e) { + throw new IllegalStateException(e); + } catch (InvocationTargetException e) { + throw new IllegalStateException(e); + } + } + + public List<T> getStages() { + return this.stages; + } + } + + private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + + public TcpTraceReconstructionAnalysisWithThreads() { + try { + this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor()); + this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } catch (SecurityException e) { + throw new IllegalArgumentException(e); + } + } + private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); - this.recordCounter = new CountingFilter<IMonitoringRecord>(); + CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); - this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>(); + ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); - this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); - this.traceCounter = new CountingFilter<TraceEventRecords>(); + CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create(); + ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create(); // EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); @@ -118,7 +163,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); - SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); + SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort()); + // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); @@ -127,15 +174,15 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); - SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); - // SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + // SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); + SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + // SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>(); pipeline.setFirstStage(relay); - // pipeline.addIntermediateStage(this.recordCounter); + pipeline.addIntermediateStage(recordThroughputFilter); // pipeline.addIntermediateStage(sysout); // pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); @@ -151,7 +198,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { super.start(); this.tcpThread.start(); - // this.clockThread.start(); + this.clockThread.start(); // this.clock2Thread.start(); for (Thread workerThread : this.workerThreads) { @@ -176,19 +223,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } public int getNumRecords() { - return this.recordCounter.getNumElementsPassed(); + int sum = 0; + for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; } public int getNumTraces() { - return this.traceCounter.getNumElementsPassed(); + int sum = 0; + for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) { + sum += stage.getNumElementsPassed(); + } + return sum; } - public List<Long> getRecordThroughputs() { - return this.recordThroughputFilter.getThroughputs(); + public List<Long> getRecordDelays() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getDelays()); + } + return throughputs; } - public List<Long> getTraceThroughputs() { - return this.traceThroughputFilter.getThroughputs(); + public List<Long> getTraceDelays() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getDelays()); + } + return throughputs; } public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {