From e5821f6d637d0d2f55048f08613d87cef89e8476 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 8 Jul 2014 21:17:48 +0200 Subject: [PATCH] added performance tests --- .../ChwHomeTcpTraceReadingTest.java | 8 + ...omeTcpTraceReconstructionAnalysisTest.java | 100 +++++++++++ ...hwHomeTraceReconstructionAnalysisTest.java | 6 +- ...orkTcpTraceReconstructionAnalysisTest.java | 11 +- .../TcpTraceReconstructionAnalysis.java | 26 +-- ...ReconstructionAnalysisWithThreadsTest.java | 162 ++++++++++++++++++ ...raceReconstructionAnalysisWithThreads.java | 58 +++---- 7 files changed, 323 insertions(+), 48 deletions(-) create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java index e3d18e31..496d83ff 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/ChwHomeTcpTraceReadingTest.java @@ -15,7 +15,12 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.examples.traceReading; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import java.util.List; import java.util.Map; @@ -74,6 +79,9 @@ public class ChwHomeTcpTraceReadingTest { System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit"); assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); + + // 08.07.2014 (incl.) + assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3200L)))); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java new file mode 100644 index 00000000..25cbe39a --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java @@ -0,0 +1,100 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.examples.traceReconstruction; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import teetime.util.ListUtil; +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ChwHomeTcpTraceReconstructionAnalysisTest { + + private static final int MIO = 1000000; + private static final int EXPECTED_NUM_TRACES = 10 * MIO; + private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1; + + private StopWatch stopWatch; + + @Before + public void before() { + this.stopWatch = new StopWatch(); + } + + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + + @Test + public void performAnalysis() { + final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis(); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs()); + Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs); + System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit"); + + // List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs()); + // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); + // System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); + + assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); + assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); + + // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); + // + // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + // until 04.07.2014 (incl.) + // Median throughput: 74 elements/time unit + // Duration: 17445 ms + // Median throughput: 78 elements/time unit + // Duration: 16608 ms + + // 08.07.2014 (incl.) + assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3200L)))); + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 21df773f..26f2baf4 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -69,6 +69,9 @@ public class ChwHomeTraceReconstructionAnalysisTest { analysis.onTerminate(); } + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + assertEquals(50002, analysis.getNumRecords()); assertEquals(2, analysis.getNumTraces()); @@ -77,9 +80,6 @@ public class ChwHomeTraceReconstructionAnalysisTest { TraceEventRecords trace6886 = analysis.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); } @Test diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 3c102237..93882553 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -34,6 +34,10 @@ import teetime.util.StopWatch; */ public class ChwWorkTcpTraceReconstructionAnalysisTest { + private static final int MIO = 1000000; + private static final int EXPECTED_NUM_TRACES = 10 * MIO; + private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1; + private StopWatch stopWatch; @Before @@ -63,8 +67,8 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); - // assertEquals(1000, analysis.getNumTraces()); - assertEquals(1000000, analysis.getNumTraces()); + assertEquals(EXPECTED_NUM_RECORDS, analysis.getNumRecords()); + assertEquals(EXPECTED_NUM_TRACES, analysis.getNumTraces()); // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); @@ -72,9 +76,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - // assertEquals(21001, analysis.getNumRecords()); - assertEquals(21000001, analysis.getNumRecords()); - // until 04.07.2014 (inkl.) // Median throughput: 74 elements/time unit // Duration: 17445 ms diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index f94e7579..994d3bc2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -26,6 +26,9 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReconstructionAnalysis extends Analysis { + private static final int MIO = 1000000; + private static final int TCP_RELAY_MAX_SIZE = 2 * MIO; + private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private Thread clockThread; @@ -79,13 +82,14 @@ public class TcpTraceReconstructionAnalysis extends Analysis { EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); // connect stages - SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), 1024); + SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); + // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1); @@ -96,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(instanceOfFilter); - // pipeline.addIntermediateStage(this.recordThroughputFilter); + pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(this.traceThroughputFilter); + // pipeline.addIntermediateStage(this.traceThroughputFilter); pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; @@ -108,9 +112,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { public void start() { super.start(); - this.clockThread.start(); - this.clock2Thread.start(); this.workerThread.start(); + this.clockThread.start(); + // this.clock2Thread.start(); try { this.workerThread.join(); @@ -118,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { throw new IllegalStateException(e); } this.clockThread.interrupt(); - this.clock2Thread.interrupt(); + // this.clock2Thread.interrupt(); } public List<TraceEventRecords> getElementCollection() { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java new file mode 100644 index 00000000..dbb53627 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -0,0 +1,162 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import teetime.util.ListUtil; +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; + +import kieker.common.record.IMonitoringRecord; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { + + private static final int MIO = 1000000; + private static final int EXPECTED_NUM_TRACES = 10 * MIO; + private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1; + + private StopWatch stopWatch; + + @Before + public void before() { + this.stopWatch = new StopWatch(); + } + + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + + @Test + public void performAnalysisWith1Thread() { + this.performAnalysis(1); + } + + @Test + public void performAnalysisWith2Threads() { + this.performAnalysis(2); + } + + @Test + public void performAnalysisWith4Threads() { + this.performAnalysis(4); + } + + void performAnalysis(final int numWorkerThreads) { + final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); + analysis.setNumWorkerThreads(numWorkerThreads); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + int maxNumWaits = 0; + for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) { + maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); + } + System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); + + // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); + // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); + + // Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays()); + // System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record"); + + // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); + // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); + + List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs()); + Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs); + System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit"); + + // List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs()); + // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); + // System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); + + // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); + // + // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); + assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); + + for (Integer count : analysis.getNumTraceMetadatas()) { + assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution + } + + // 08.07.2014 (incl.) + assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3100L)).and(lessThan(3500L)))); + } + + public static void main(final String[] args) { + ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest analysis = new ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest(); + analysis.before(); + try { + analysis.performAnalysisWith1Thread(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + + analysis.before(); + try { + analysis.performAnalysisWith2Threads(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + + analysis.before(); + try { + analysis.performAnalysisWith4Threads(); + } catch (Exception e) { + System.err.println(e); + } + analysis.after(); + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 1c59cc3b..f82567ba 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -62,7 +62,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); + StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); } } @@ -128,7 +128,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; - private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory; + private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory; private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; @@ -140,7 +141,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public TcpTraceReconstructionAnalysisWithThreads() { try { this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); - this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class)); this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); @@ -153,11 +155,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, - final StageWithPort<Void, Long> clockStage) { + final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); - // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); + ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); + // ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordDelayFilterFactory.create(); InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class); new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( @@ -173,40 +176,29 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { this.tcpRelayPipes.add(tcpRelayPipe); // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); + SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clock2Stage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); - SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); + SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); + SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); - // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); - // SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort()); - // SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort()); - - // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); - SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); - // SingleElementPipe.connect(traceCounter.getOutputPort(), sysout.getInputPort()); - // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceCounter.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); - // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); - - // SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); - // SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline"); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(recordCounter); - // pipeline.addIntermediateStage(recordThroughputFilter); + pipeline.addIntermediateStage(recordThroughputFilter); pipeline.addIntermediateStage(traceMetadataCounter); pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(traceThroughputFilter); + // pipeline.addIntermediateStage(traceThroughputFilter); pipeline.addIntermediateStage(traceCounter); // pipeline.addIntermediateStage(sysout); pipeline.setLastStage(endStage); @@ -218,9 +210,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public void start() { super.start(); - this.tcpThread.start(); this.clockThread.start(); - // this.clock2Thread.start(); + this.clock2Thread.start(); + this.tcpThread.start(); for (Thread workerThread : this.workerThreads) { workerThread.start(); @@ -261,12 +253,20 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { public List<Long> getRecordDelays() { List<Long> throughputs = new LinkedList<Long>(); - for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { + for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordDelayFilterFactory.getStages()) { throughputs.addAll(stage.getDelays()); } return throughputs; } + public List<Long> getRecordThroughputs() { + List<Long> throughputs = new LinkedList<Long>(); + for (ElementThroughputMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { + throughputs.addAll(stage.getThroughputs()); + } + return throughputs; + } + public List<Long> getTraceThroughputs() { List<Long> throughputs = new LinkedList<Long>(); for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { -- GitLab