diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..918611088bf6dfbef5749a2dde93ec3bb60dcc35 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java @@ -0,0 +1,55 @@ +package teetime.variant.methodcallWithPorts.stage; + +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; + +public class ThroughputFilter<T> extends ConsumerStage<T, T> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private long numPassedElements; + private long timestamp; + + private final List<Long> throughputs = new LinkedList<Long>(); + + @Override + protected void execute5(final T element) { + Long trigger = this.triggerInputPort.receive(); + if (trigger != null) { + this.computeThroughput(); + this.resetTimestamp(); + } + this.numPassedElements++; + this.send(element); + } + + @Override + public void onStart() { + this.resetTimestamp(); + super.onStart(); + } + + private void computeThroughput() { + long diffInNs = System.nanoTime() - this.timestamp; + long throughput = this.numPassedElements / diffInNs; + // this.throughputs.add(throughput); + this.logger.info("Throughput: " + throughput + " ns"); + } + + private void resetTimestamp() { + this.numPassedElements = 0; + this.timestamp = System.nanoTime(); + } + + public List<Long> getThroughputs() { + return this.throughputs; + } + + public InputPort<Long> getTriggerInputPort() { + return triggerInputPort; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 633615096c79c470de1d81addd4279656b735f72..5e8e35c0528070f35d2cc9fe328730bebba55515 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -108,14 +108,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE @Override public void onIsPipelineHead() { - this.logger.info("traceId2trace: " + TraceReconstructionFilter.traceId2trace.keySet()); - Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator(); while (iterator.hasNext()) { TraceBuffer traceBuffer = iterator.next(); this.put(traceBuffer); iterator.remove(); } + super.onIsPipelineHead(); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 60dc966ce3a8ee88cd9daac7bd23ea6936a09525..c200515b8d8039e1efd05b641d12105d6e244dbe 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -10,9 +10,12 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Cache; +import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; +import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; @@ -36,14 +39,29 @@ public class TraceReconstructionAnalysis extends Analysis { private CountingFilter<TraceEventRecords> traceCounter; + private ThroughputFilter<TraceEventRecords> throughputFilter; + @Override public void init() { super.init(); - Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(); + Pipeline<Void, Void> clockPipeline = this.buildClockPipeline(); + this.producerThread = new Thread(new RunnableStage(clockPipeline)); + + Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockPipeline); this.producerThread = new Thread(new RunnableStage(producerPipeline)); } - private Pipeline<File, Void> buildProducerPipeline() { + private Pipeline<Void, Void> buildClockPipeline() { + Clock clock = new Clock(); + clock.setIntervalDelayInMs(1000); + + Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(new EndStage<Void>()); + return pipeline; + } + + private Pipeline<File, Void> buildProducerPipeline(final Pipeline<Void, Void> clockPipeline) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000); @@ -62,8 +80,8 @@ public class TraceReconstructionAnalysis extends Analysis { final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); + this.throughputFilter = new ThroughputFilter<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); - final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); // configure stages @@ -77,9 +95,12 @@ public class TraceReconstructionAnalysis extends Analysis { SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.throughputFilter.getInputPort()); + SingleElementPipe.connect(this.throughputFilter.getOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort()); + SpScPipe.connect(clockPipeline.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); + // fill input ports dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs")); @@ -91,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis { pipeline.addIntermediateStage(stringBufferFilter); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); + pipeline.addIntermediateStage(this.throughputFilter); pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(collector); return pipeline; @@ -120,4 +142,8 @@ public class TraceReconstructionAnalysis extends Analysis { public int getNumTraces() { return this.traceCounter.getNumElementsPassed(); } + + public List<Long> getThroughputs() { + return this.throughputFilter.getThroughputs(); + } }