Skip to content
Snippets Groups Projects
Commit e5ae5810 authored by Christian Wulf's avatar Christian Wulf
Browse files

migrated TcpTraceReconstructionAnalysis*

parent c1584508
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,6 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage;
import teetime.framework.RunnableStage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
......@@ -32,10 +31,6 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread clockThread;
private Thread clock2Thread;
private Thread workerThread;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private Counter<IMonitoringRecord> recordCounter;
......@@ -54,13 +49,13 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu
private void init() {
Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
addThreadableStage(clockStage);
Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
addThreadableStage(clock2Stage);
IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
this.workerThread = new Thread(new RunnableStage(pipeline));
addThreadableStage(pipeline);
}
private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
......
......@@ -33,7 +33,6 @@ import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.framework.Analysis;
import teetime.framework.pipe.SpScPipe;
import teetime.util.ListUtil;
import teetime.util.StopWatch;
import util.test.StatisticsUtil;
......@@ -79,9 +78,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
}
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
configuration.setNumWorkerThreads(numWorkerThreads);
configuration.buildConfiguration();
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads);
Analysis analysis = new Analysis(configuration);
analysis.init();
......@@ -93,11 +90,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
this.stopWatch.end();
}
int maxNumWaits = 0;
for (SpScPipe pipe : configuration.getTcpRelayPipes()) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits());
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
......
......@@ -28,7 +28,6 @@ import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.framework.Analysis;
import teetime.framework.pipe.SpScPipe;
import teetime.util.ListUtil;
import teetime.util.StopWatch;
import util.test.StatisticsUtil;
......@@ -87,11 +86,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// Duration: 22373 ms
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
configuration.setNumWorkerThreads(numWorkerThreads);
configuration.buildConfiguration();
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads);
Analysis analysis = new Analysis(configuration);
final Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -101,11 +98,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
this.stopWatch.end();
}
int maxNumWaits = 0;
for (SpScPipe pipe : configuration.getTcpRelayPipes()) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits());
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
......
......@@ -9,7 +9,10 @@ import java.util.List;
import teetime.framework.AbstractStage;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SpScPipe;
import teetime.stage.Clock;
import teetime.stage.Counter;
......@@ -17,6 +20,7 @@ import teetime.stage.ElementDelayMeasuringStage;
import teetime.stage.ElementThroughputMeasuringStage;
import teetime.stage.InstanceCounter;
import teetime.stage.InstanceOfFilter;
import teetime.stage.Pipeline;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
......@@ -38,7 +42,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private int numWorkerThreads;
private final int numWorkerThreads;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
......@@ -50,11 +54,14 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final List<SpScPipe> tcpRelayPipes = new LinkedList<SpScPipe>();
private final List<IPipe> tcpRelayPipes = new LinkedList<IPipe>();
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReconstructionAnalysisWithThreadsConfiguration() {
public TcpTraceReconstructionAnalysisWithThreadsConfiguration(final int numWorkerThreads) {
super();
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, numWorkerThreads);
try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
......@@ -69,43 +76,46 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
public void buildConfiguration() {
final IStage tcpPipeline = this.buildTcpPipeline();
this.getFiniteProducerStages().add(tcpPipeline);
private void init() {
Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
addThreadableStage(tcpPipeline);
final IStage clockStage = this.buildClockPipeline(1000);
this.getInfiniteProducerStages().add(clockStage);
Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000);
addThreadableStage(clockStage);
final IStage clock2Stage = this.buildClockPipeline(2000);
this.getInfiniteProducerStages().add(clock2Stage);
Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
addThreadableStage(clock2Stage);
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
for (int i = 0; i < this.numWorkerThreads; i++) {
IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
this.getConsumerStages().add(pipeline);
addThreadableStage(pipeline);
}
}
private IStage buildTcpPipeline() {
private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
return tcpReader;
return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
}
private IStage buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
return clock;
return new Pipeline<Distributor<Long>>(clock, distributor);
}
private static class StageFactory<T extends AbstractStage> {
......@@ -155,22 +165,22 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
// SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe);
SpScPipe.connect(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort());
intraThreadPipeFactory.create(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
intraThreadPipeFactory.create(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
// intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
intraThreadPipeFactory.create(traceCounter.getOutputPort(), endStage.getInputPort());
return relay;
}
......@@ -196,7 +206,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
}
public List<Long> getRecordDelays() {
List<Long> throughputs = new LinkedList<Long>();
final List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordDelayFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
......@@ -227,18 +237,19 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
return numTraceMetadatas;
}
public List<SpScPipe> getTcpRelayPipes() {
return this.tcpRelayPipes;
public int getMaxNumWaits() {
int maxNumWaits = 0;
for (IPipe pipe : this.tcpRelayPipes) {
SpScPipe interThreadPipe = (SpScPipe) pipe;
maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits());
}
return maxNumWaits;
}
public int getNumWorkerThreads() {
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
public int getMaxElementsCreated() {
return this.traceId2trace.getMaxElements();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment