Skip to content
Snippets Groups Projects
Commit bc876530 authored by Christian Wulf's avatar Christian Wulf
Browse files

added delay and throughput measuring stages

parent eba77d61
No related branches found
No related tags found
No related merge requests found
Showing
with 185 additions and 67 deletions
......@@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = ALL
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
\ No newline at end of file
teetime.variant.methodcallWithPorts.stage.level = WARNING
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -81,8 +81,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// StageWithPort<?, ?> headStage = this.currentHeads.next();
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
......@@ -187,8 +185,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.firstStage = null;
this.intermediateStages.clear();
this.lastStage = null;
System.out.println("cleaned up");
}
}
......@@ -25,7 +25,7 @@ public class Clock extends ProducerStage<Void, Long> {
this.sleep(this.intervalDelayInMs);
}
this.logger.debug("Emitting timestamp");
// this.logger.debug("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
}
......
package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long lastTimestampInNs;
private final List<Long> delays = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementDelay(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
}
@Override
public void onStart() {
this.resetTimestamp(System.nanoTime());
super.onStart();
}
private void computeElementDelay(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (this.numPassedElements > 0) {
long delayInNsPerElement = diffInNs / this.numPassedElements;
this.delays.add(delayInNsPerElement);
this.logger.info("Delay: " + delayInNsPerElement + " time units/element");
this.resetTimestamp(timestampInNs);
}
}
private void resetTimestamp(final Long timestampInNs) {
this.numPassedElements = 0;
this.lastTimestampInNs = timestampInNs;
}
public List<Long> getDelays() {
return this.delays;
}
public InputPort<Long> getTriggerInputPort() {
return this.triggerInputPort;
}
}
......@@ -2,17 +2,16 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ThroughputFilter<T> extends ConsumerStage<T, T> {
public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long timestamp;
private long lastTimestampInNs;
private final List<Long> throughputs = new LinkedList<Long>();
......@@ -20,8 +19,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
protected void execute5(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeThroughput();
this.resetTimestamp();
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
......@@ -29,24 +27,24 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
@Override
public void onStart() {
this.resetTimestamp();
this.resetTimestamp(System.nanoTime());
super.onStart();
}
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (diffInNs > 0) {
long throughputInNsPerElement = this.numPassedElements / diffInNs;
this.throughputs.add(throughputInNsPerElement);
this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit");
// long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
// long throughputPerSec = this.numPassedElements / diffInSec;
this.resetTimestamp(timestampInNs);
}
}
private void resetTimestamp() {
private void resetTimestamp(final Long timestampInNs) {
this.numPassedElements = 0;
this.timestamp = System.nanoTime();
this.lastTimestampInNs = timestampInNs;
}
public List<Long> getThroughputs() {
......
......@@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l))));
}
......@@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
}
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -76,7 +76,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -102,7 +102,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -130,7 +130,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -34,8 +34,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
@Override
public void init() {
......@@ -70,9 +70,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
......
......@@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.stage.Cache;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -40,7 +40,7 @@ public class TraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> throughputFilter;
private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter;
private File inputDir;
......@@ -72,7 +72,7 @@ public class TraceReconstructionAnalysis extends Analysis {
final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ThroughputFilter<IFlowRecord>();
this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
......
......@@ -80,8 +80,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
......@@ -12,10 +15,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -35,13 +38,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private Thread clock2Thread;
private Thread[] workerThreads;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
private int numWorkerThreads;
......@@ -81,6 +77,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
......@@ -93,18 +90,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return pipeline;
}
private static class StageFactory<T extends StageWithPort<?, ?>> {
private final Constructor<T> constructor;
private final List<T> stages = new ArrayList<T>();
public StageFactory(final Constructor<T> constructor) {
this.constructor = constructor;
}
public T create(final Object... initargs) {
try {
T stage = this.constructor.newInstance(initargs);
this.stages.add(stage);
return stage;
} catch (InstantiationException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
} catch (IllegalArgumentException e) {
throw new IllegalStateException(e);
} catch (InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
public List<T> getStages() {
return this.stages;
}
}
private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
// EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
......@@ -118,7 +163,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
......@@ -127,15 +174,15 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>();
pipeline.setFirstStage(relay);
// pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(sysout);
// pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
......@@ -151,7 +198,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
super.start();
this.tcpThread.start();
// this.clockThread.start();
this.clockThread.start();
// this.clock2Thread.start();
for (Thread workerThread : this.workerThreads) {
......@@ -176,19 +223,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
int sum = 0;
for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
int sum = 0;
for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputFilter.getThroughputs();
public List<Long> getRecordDelays() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
return throughputs;
}
public List<Long> getTraceThroughputs() {
return this.traceThroughputFilter.getThroughputs();
public List<Long> getTraceDelays() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
return throughputs;
}
public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment