Skip to content
Snippets Groups Projects
Commit c1584508 authored by Christian Wulf's avatar Christian Wulf
Browse files

migrated TcpTraceReconstructionAnalysis to configuration

parent 0fe6dd16
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
......@@ -26,9 +25,12 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati
private final IPipeFactory interThreadPipeFactory;
public TcpTraceLoggingExtAnalysisConfiguration() {
intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
private void init() {
final Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
addThreadableStage(clockPipeline);
final IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
......
......@@ -30,6 +30,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.framework.Analysis;
import teetime.util.ListUtil;
import teetime.util.StopWatch;
import util.test.StatisticsUtil;
......@@ -60,7 +61,9 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest {
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis();
final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration();
final Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -70,7 +73,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest {
this.stopWatch.end();
}
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit");
......@@ -78,8 +81,8 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest {
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
// System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces());
assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
......
......@@ -24,6 +24,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.framework.Analysis;
import teetime.util.StopWatch;
import util.test.StatisticsUtil;
......@@ -53,7 +54,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis();
final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration();
Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -63,11 +66,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
this.stopWatch.end();
}
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
assertEquals(EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals(EXPECTED_NUM_TRACES, analysis.getNumTraces());
assertEquals(EXPECTED_NUM_RECORDS, configuration.getNumRecords());
assertEquals(EXPECTED_NUM_TRACES, configuration.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
......
......@@ -3,14 +3,17 @@ package teetime.examples.traceReconstruction;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage;
import teetime.framework.RunnableStage;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.ElementThroughputMeasuringStage;
import teetime.stage.InstanceOfFilter;
import teetime.stage.Pipeline;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TcpReader;
......@@ -22,7 +25,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysis {
public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfiguration {
private static final int MIO = 1000000;
private static final int TCP_RELAY_MAX_SIZE = 2 * MIO;
......@@ -40,25 +43,34 @@ public class TcpTraceReconstructionAnalysis {
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
public void init() {
IStage clockStage = this.buildClockPipeline(1000);
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
public TcpTraceReconstructionAnalysisConfiguration() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
private void init() {
Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
IStage clock2Stage = this.buildClockPipeline(2000);
Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private IStage buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
return clock;
return new Pipeline<Distributor<Long>>(clock, distributor);
}
private IStage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
......@@ -74,37 +86,22 @@ public class TcpTraceReconstructionAnalysis {
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// intraThreadPipeFactory.create(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort());
intraThreadPipeFactory.create(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), endStage.getInputPort());
interThreadPipeFactory.create(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
return tcpReader;
}
public void start() {
this.workerThread.start();
// this.clockThread.start();
this.clock2Thread.start();
try {
this.workerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment