Skip to content
Snippets Groups Projects
Commit b8a9c7ec authored by Christian Wulf's avatar Christian Wulf
Browse files

added ThroughputFilter

parent 1545576e
No related branches found
No related tags found
No related merge requests found
package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long timestamp;
private final List<Long> throughputs = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
Long trigger = this.triggerInputPort.receive();
if (trigger != null) {
this.computeThroughput();
this.resetTimestamp();
}
this.numPassedElements++;
this.send(element);
}
@Override
public void onStart() {
this.resetTimestamp();
super.onStart();
}
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long throughput = this.numPassedElements / diffInNs;
// this.throughputs.add(throughput);
this.logger.info("Throughput: " + throughput + " ns");
}
private void resetTimestamp() {
this.numPassedElements = 0;
this.timestamp = System.nanoTime();
}
public List<Long> getThroughputs() {
return this.throughputs;
}
public InputPort<Long> getTriggerInputPort() {
return triggerInputPort;
}
}
...@@ -108,14 +108,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE ...@@ -108,14 +108,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
@Override @Override
public void onIsPipelineHead() { public void onIsPipelineHead() {
this.logger.info("traceId2trace: " + TraceReconstructionFilter.traceId2trace.keySet());
Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator(); Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next(); TraceBuffer traceBuffer = iterator.next();
this.put(traceBuffer); this.put(traceBuffer);
iterator.remove(); iterator.remove();
} }
super.onIsPipelineHead(); super.onIsPipelineHead();
} }
......
...@@ -10,9 +10,12 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; ...@@ -10,9 +10,12 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Cache; import teetime.variant.methodcallWithPorts.stage.Cache;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.CountingFilter; import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
...@@ -36,14 +39,29 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -36,14 +39,29 @@ public class TraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter; private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<TraceEventRecords> throughputFilter;
@Override @Override
public void init() { public void init() {
super.init(); super.init();
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(); Pipeline<Void, Void> clockPipeline = this.buildClockPipeline();
this.producerThread = new Thread(new RunnableStage(clockPipeline));
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockPipeline);
this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.producerThread = new Thread(new RunnableStage(producerPipeline));
} }
private Pipeline<File, Void> buildProducerPipeline() { private Pipeline<Void, Void> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(new EndStage<Void>());
return pipeline;
}
private Pipeline<File, Void> buildProducerPipeline(final Pipeline<Void, Void> clockPipeline) {
this.classNameRegistryRepository = new ClassNameRegistryRepository(); this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000); // final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
...@@ -62,8 +80,8 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -62,8 +80,8 @@ public class TraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class); IFlowRecord.class);
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.throughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
// configure stages // configure stages
...@@ -77,9 +95,12 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -77,9 +95,12 @@ public class TraceReconstructionAnalysis extends Analysis {
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.throughputFilter.getInputPort());
SingleElementPipe.connect(this.throughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort());
SpScPipe.connect(clockPipeline.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
// fill input ports // fill input ports
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs")); dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs"));
...@@ -91,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -91,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis {
pipeline.addIntermediateStage(stringBufferFilter); pipeline.addIntermediateStage(stringBufferFilter);
pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter); pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.throughputFilter);
pipeline.addIntermediateStage(this.traceCounter); pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(collector); pipeline.setLastStage(collector);
return pipeline; return pipeline;
...@@ -120,4 +142,8 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -120,4 +142,8 @@ public class TraceReconstructionAnalysis extends Analysis {
public int getNumTraces() { public int getNumTraces() {
return this.traceCounter.getNumElementsPassed(); return this.traceCounter.getNumElementsPassed();
} }
public List<Long> getThroughputs() {
return this.throughputFilter.getThroughputs();
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment