diff --git a/src/main/java/teetime/util/StatisticsUtil.java b/src/main/java/teetime/util/StatisticsUtil.java index c27035aeead5e9e91ae387f442634829a4b00cf9..a5120bba7478b03daa2dd41866b88cfe523ae629 100644 --- a/src/main/java/teetime/util/StatisticsUtil.java +++ b/src/main/java/teetime/util/StatisticsUtil.java @@ -17,6 +17,7 @@ package teetime.util; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -111,4 +112,15 @@ public class StatisticsUtil { return quintileValues; } + public static void removeFirstZeroThroughputs(final List<Long> throughputs) { + Iterator<Long> iterator = throughputs.iterator(); + while (iterator.hasNext()) { + if (iterator.next() == 0) { + iterator.remove(); + } else { + break; + } + } + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index 3d70d4b9256c3387b9c5606a757d6d861f1c212d..c28d0bcfca564986282b0c0898818a10e7498829 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -21,8 +21,6 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Signal; -import kieker.common.record.IMonitoringRecord; - /** * * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. @@ -97,7 +95,7 @@ public class Merger<T> extends AbstractStage { return super.getInputPorts(); } - public InputPort<IMonitoringRecord> getNewInputPort() { + public InputPort<T> getNewInputPort() { return this.createInputPort(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 5d5254953a4e5e605aa6a5c6d282a7cd7898cce2..4693eea8765030544096d89d1b192cfb569c52cc 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -34,7 +34,8 @@ import kieker.common.record.flow.trace.TraceMetadata; */ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { - private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort(); + private final OutputPort<TraceEventRecords> traceValidOutputPort = this.createOutputPort(); + private final OutputPort<TraceEventRecords> traceInvalidOutputPort = this.createOutputPort(); // TODO send output to this port private TimeUnit timeunit; private long maxTraceDuration = Long.MAX_VALUE; @@ -52,16 +53,25 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { protected void execute(final IFlowRecord element) { final Long traceId = this.reconstructTrace(element); if (traceId != null) { - this.putIfFinished(traceId); + this.put(traceId, true); } } - private void putIfFinished(final Long traceId) { + private void put(final Long traceId, final boolean onlyIfFinished) { final TraceBuffer traceBuffer = this.traceId2trace.get(traceId); - if (traceBuffer != null && traceBuffer.isFinished()) { // null-check to check whether the trace has already been sent and removed - boolean removed = (null != this.traceId2trace.remove(traceId)); - if (removed) { - this.put(traceBuffer); + if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed + boolean shouldSend; + if (onlyIfFinished) { + shouldSend = traceBuffer.isFinished(); + } else { + shouldSend = true; + } + + if (shouldSend) { + boolean removed = (null != this.traceId2trace.remove(traceId)); + if (removed) { + this.sendTraceBuffer(traceBuffer); + } } } } @@ -86,17 +96,16 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { @Override public void onIsPipelineHead() { for (Long traceId : this.traceId2trace.keySet()) { - this.putIfFinished(traceId); // FIXME also put invalid traces at the end + this.put(traceId, false); } super.onIsPipelineHead(); } - private void put(final TraceBuffer traceBuffer) { - // final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort = - // (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort; - // context.put(outputPort, traceBuffer.toTraceEvents()); - this.send(this.outputPort, traceBuffer.toTraceEvents()); + private void sendTraceBuffer(final TraceBuffer traceBuffer) { + OutputPort<TraceEventRecords> outputPort = (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort + : this.traceValidOutputPort; + this.send(outputPort, traceBuffer.toTraceEvents()); } public TimeUnit getTimeunit() { @@ -131,8 +140,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp; } - public OutputPort<TraceEventRecords> getOutputPort() { - return this.outputPort; + public OutputPort<TraceEventRecords> getTraceValidOutputPort() { + return this.traceValidOutputPort; + } + + public OutputPort<TraceEventRecords> getTraceInvalidOutputPort() { + return this.traceInvalidOutputPort; } // public Map<Long, TraceBuffer> getTraceId2trace() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index 5c5c568ab4b0a7e9cfed8093b2fa31faa5365995..b6c76cd83262d09df091e81e06cf97e3753bdb94 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -80,7 +80,7 @@ public class TcpTraceReconstruction extends Analysis { SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); // create and configure pipeline Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index b450cc2055e162988a98dbcbedd77a59bea8efd5..e01c1b521b5bb376b1d65fda743a60f1409227a6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -107,7 +107,7 @@ public class TcpTraceReduction extends Analysis { SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort()); SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 26f2baf49f26538007e9049325b8baee207f98de..670435766e2083bbcd69e9fae9161e7170a786c3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -69,8 +69,9 @@ public class ChwHomeTraceReconstructionAnalysisTest { analysis.onTerminate(); } + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); assertEquals(50002, analysis.getNumRecords()); assertEquals(2, analysis.getNumTraces()); @@ -80,6 +81,8 @@ public class ChwHomeTraceReconstructionAnalysisTest { TraceEventRecords trace6886 = analysis.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + assertThat(quintiles.get(0.5), is(both(greaterThan(34l)).and(lessThan(320l)))); } @Test @@ -96,6 +99,10 @@ public class ChwHomeTraceReconstructionAnalysisTest { analysis.onTerminate(); } + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); + assertEquals(1489902, analysis.getNumRecords()); assertEquals(24013, analysis.getNumTraces()); @@ -105,9 +112,6 @@ public class ChwHomeTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); - assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l)))); } @@ -125,6 +129,10 @@ public class ChwHomeTraceReconstructionAnalysisTest { analysis.onTerminate(); } + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); + assertEquals(17371, analysis.getNumRecords()); assertEquals(22, analysis.getNumTraces()); @@ -134,8 +142,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(1, trace1.getTraceMetadata().getTraceId()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + assertThat(quintiles.get(0.5), is(both(greaterThan(200l)).and(lessThan(250l)))); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 1bfbc094e63976aea4fe68b90033dabf14069902..5e112cd7e1c924449cbba81f6f328badc8be3f8a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -23,8 +23,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import java.io.File; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -80,7 +78,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace6886 = analysis.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - this.removeFirstZeroThroughputs(analysis); + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); } @@ -108,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); - this.removeFirstZeroThroughputs(analysis); + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); @@ -138,21 +136,9 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(1, trace1.getTraceMetadata().getTraceId()); - this.removeFirstZeroThroughputs(analysis); + StatisticsUtil.removeFirstZeroThroughputs(analysis.getThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); } - private void removeFirstZeroThroughputs(final TraceReconstructionAnalysis analysis) { - List<Long> throughputs = analysis.getThroughputs(); - Iterator<Long> iterator = throughputs.iterator(); - while (iterator.hasNext()) { - if (iterator.next() == 0) { - iterator.remove(); - } else { - break; - } - } - } - } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 57bf8054baecae482e50f21c0c60f2ed5dfbf85c..a26578dea46c53fae8c302554b8c0239d72c9092 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -89,7 +89,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 26ae3b495719efa43aded35d8b76ea6f9108f4b4..5b9bce4c53589173f00d08ea9ff0f36fa17c061e 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -18,6 +18,7 @@ import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; +import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -75,6 +76,7 @@ public class TraceReconstructionAnalysis extends Analysis { IFlowRecord.class); this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace); + Merger<TraceEventRecords> merger = new Merger<TraceEventRecords>(); this.traceCounter = new Counter<TraceEventRecords>(); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); @@ -91,7 +93,9 @@ public class TraceReconstructionAnalysis extends Analysis { SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort()); SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); + SingleElementPipe.connect(merger.getOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort()); SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); @@ -108,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis { pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(this.throughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); + pipeline.addIntermediateStage(merger); pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(collector); return pipeline; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 0817310f8f0a2e9bbb812632ff83e8cafe11fdbb..0a718b4fa08c2d32dcd1ab4302ac9074c36b91ab 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -184,7 +184,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceCounter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index 85fa98ffac141865daea166bbe2e5833289aa3db..3f1d0ae1f45280256f9a30569c176ceac84e8ae2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -178,7 +178,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort()); SingleElementPipe.connect(traceReductionFilter.getOutputPort(), traceCounter.getInputPort()); SingleElementPipe.connect(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort()); SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());