Skip to content
Snippets Groups Projects
Commit bff16273 authored by Christian Wulf's avatar Christian Wulf
Browse files

added performance tests

parent 630507d1
No related branches found
No related tags found
No related merge requests found
Showing
with 323 additions and 48 deletions
...@@ -15,7 +15,12 @@ ...@@ -15,7 +15,12 @@
***************************************************************************/ ***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReading; package teetime.variant.methodcallWithPorts.examples.traceReading;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -74,6 +79,9 @@ public class ChwHomeTcpTraceReadingTest { ...@@ -74,6 +79,9 @@ public class ChwHomeTcpTraceReadingTest {
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit"); System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
// 08.07.2014 (incl.)
assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3200L))));
} }
} }
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class ChwHomeTcpTraceReconstructionAnalysisTest {
private static final int MIO = 1000000;
private static final int EXPECTED_NUM_TRACES = 10 * MIO;
private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis();
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit");
// List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
// System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// until 04.07.2014 (incl.)
// Median throughput: 74 elements/time unit
// Duration: 17445 ms
// Median throughput: 78 elements/time unit
// Duration: 16608 ms
// 08.07.2014 (incl.)
assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3200L))));
}
}
...@@ -69,6 +69,9 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -69,6 +69,9 @@ public class ChwHomeTraceReconstructionAnalysisTest {
analysis.onTerminate(); analysis.onTerminate();
} }
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
assertEquals(50002, analysis.getNumRecords()); assertEquals(50002, analysis.getNumRecords());
assertEquals(2, analysis.getNumTraces()); assertEquals(2, analysis.getNumTraces());
...@@ -77,9 +80,6 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -77,9 +80,6 @@ public class ChwHomeTraceReconstructionAnalysisTest {
TraceEventRecords trace6886 = analysis.getElementCollection().get(1); TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
} }
@Test @Test
......
...@@ -34,6 +34,10 @@ import teetime.util.StopWatch; ...@@ -34,6 +34,10 @@ import teetime.util.StopWatch;
*/ */
public class ChwWorkTcpTraceReconstructionAnalysisTest { public class ChwWorkTcpTraceReconstructionAnalysisTest {
private static final int MIO = 1000000;
private static final int EXPECTED_NUM_TRACES = 10 * MIO;
private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
private StopWatch stopWatch; private StopWatch stopWatch;
@Before @Before
...@@ -63,8 +67,8 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { ...@@ -63,8 +67,8 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
// assertEquals(1000, analysis.getNumTraces()); assertEquals(EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals(1000000, analysis.getNumTraces()); assertEquals(EXPECTED_NUM_TRACES, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0); // TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); // assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
...@@ -72,9 +76,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { ...@@ -72,9 +76,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1); // TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); // assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
// until 04.07.2014 (inkl.) // until 04.07.2014 (inkl.)
// Median throughput: 74 elements/time unit // Median throughput: 74 elements/time unit
// Duration: 17445 ms // Duration: 17445 ms
......
...@@ -26,6 +26,9 @@ import kieker.common.record.flow.IFlowRecord; ...@@ -26,6 +26,9 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysis extends Analysis { public class TcpTraceReconstructionAnalysis extends Analysis {
private static final int MIO = 1000000;
private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread clockThread; private Thread clockThread;
...@@ -79,13 +82,14 @@ public class TcpTraceReconstructionAnalysis extends Analysis { ...@@ -79,13 +82,14 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages // connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), 1024); SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1); SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
...@@ -96,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { ...@@ -96,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
pipeline.setFirstStage(tcpReader); pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter); pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter); pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter); // pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter); pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage); pipeline.setLastStage(endStage);
return pipeline; return pipeline;
...@@ -108,9 +112,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis { ...@@ -108,9 +112,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.clockThread.start();
this.clock2Thread.start();
this.workerThread.start(); this.workerThread.start();
this.clockThread.start();
// this.clock2Thread.start();
try { try {
this.workerThread.join(); this.workerThread.join();
...@@ -118,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis { ...@@ -118,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
this.clockThread.interrupt(); this.clockThread.interrupt();
this.clock2Thread.interrupt(); // this.clock2Thread.interrupt();
} }
public List<TraceEventRecords> getElementCollection() { public List<TraceEventRecords> getElementCollection() {
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
private static final int MIO = 1000000;
private static final int EXPECTED_NUM_TRACES = 10 * MIO;
private static final int EXPECTED_NUM_RECORDS = 21 * EXPECTED_NUM_TRACES + 1;
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysisWith1Thread() {
this.performAnalysis(1);
}
@Test
public void performAnalysisWith2Threads() {
this.performAnalysis(2);
}
@Test
public void performAnalysisWith4Threads() {
this.performAnalysis(4);
}
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit");
// List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
// System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces());
for (Integer count : analysis.getNumTraceMetadatas()) {
assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution
}
// 08.07.2014 (incl.)
assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3100L)).and(lessThan(3500L))));
}
public static void main(final String[] args) {
ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest analysis = new ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest();
analysis.before();
try {
analysis.performAnalysisWith1Thread();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
analysis.before();
try {
analysis.performAnalysisWith2Threads();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
analysis.before();
try {
analysis.performAnalysisWith4Threads();
} catch (Exception e) {
System.err.println(e);
}
analysis.after();
}
}
...@@ -62,7 +62,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -62,7 +62,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
this.workerThreads = new Thread[this.numWorkerThreads]; this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) { for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage); StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline)); this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
} }
} }
...@@ -128,7 +128,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -128,7 +128,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory;
private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory;
private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory; private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory;
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
...@@ -140,7 +141,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -140,7 +141,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
public TcpTraceReconstructionAnalysisWithThreads() { public TcpTraceReconstructionAnalysisWithThreads() {
try { try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class));
this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class)); this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class));
this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); this.traceCounterFactory = new StageFactory(Counter.class.getConstructor());
...@@ -153,11 +155,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -153,11 +155,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
} }
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage) { final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
// create stages // create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create(); ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
// ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordDelayFilterFactory.create();
InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class); InstanceCounter<IMonitoringRecord, TraceMetadata> traceMetadataCounter = this.traceMetadataCounterFactory.create(TraceMetadata.class);
new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class); new InstanceCounter<IMonitoringRecord, TraceMetadata>(TraceMetadata.class);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
...@@ -173,40 +176,29 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -173,40 +176,29 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
this.tcpRelayPipes.add(tcpRelayPipe); this.tcpRelayPipes.add(tcpRelayPipe);
// SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceCounter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
// SingleElementPipe.connect(traceCounter.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline // create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline"); Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline");
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordCounter); pipeline.addIntermediateStage(recordCounter);
// pipeline.addIntermediateStage(recordThroughputFilter); pipeline.addIntermediateStage(recordThroughputFilter);
pipeline.addIntermediateStage(traceMetadataCounter); pipeline.addIntermediateStage(traceMetadataCounter);
pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter); pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceThroughputFilter); // pipeline.addIntermediateStage(traceThroughputFilter);
pipeline.addIntermediateStage(traceCounter); pipeline.addIntermediateStage(traceCounter);
// pipeline.addIntermediateStage(sysout); // pipeline.addIntermediateStage(sysout);
pipeline.setLastStage(endStage); pipeline.setLastStage(endStage);
...@@ -218,9 +210,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -218,9 +210,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.tcpThread.start();
this.clockThread.start(); this.clockThread.start();
// this.clock2Thread.start(); this.clock2Thread.start();
this.tcpThread.start();
for (Thread workerThread : this.workerThreads) { for (Thread workerThread : this.workerThreads) {
workerThread.start(); workerThread.start();
...@@ -261,12 +253,20 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { ...@@ -261,12 +253,20 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
public List<Long> getRecordDelays() { public List<Long> getRecordDelays() {
List<Long> throughputs = new LinkedList<Long>(); List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) { for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordDelayFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays()); throughputs.addAll(stage.getDelays());
} }
return throughputs; return throughputs;
} }
public List<Long> getRecordThroughputs() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementThroughputMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getThroughputs());
}
return throughputs;
}
public List<Long> getTraceThroughputs() { public List<Long> getTraceThroughputs() {
List<Long> throughputs = new LinkedList<Long>(); List<Long> throughputs = new LinkedList<Long>();
for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) { for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment