diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java index 9b2a5e47a899ac4b66fc770565cfcc60d6261fdf..7d846870f8ca05535124bb6bf1bc0d3de52dd226 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java @@ -5,7 +5,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.IStage; -import teetime.framework.RunnableStage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -32,10 +31,6 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); - private Thread clockThread; - private Thread clock2Thread; - private Thread workerThread; - private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private Counter<IMonitoringRecord> recordCounter; @@ -54,13 +49,13 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu private void init() { Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + addThreadableStage(clockStage); Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + addThreadableStage(clock2Stage); IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); - this.workerThread = new Thread(new RunnableStage(pipeline)); + addThreadableStage(pipeline); } private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java index 5e3af5a5935d97cb2dd7e6ea0dfd37712327b1ba..aa4b4f0bd699b30e9378dccd61d28ea5c4ad8151 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -33,7 +33,6 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import teetime.framework.Analysis; -import teetime.framework.pipe.SpScPipe; import teetime.util.ListUtil; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -79,9 +78,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { } void performAnalysis(final int numWorkerThreads) { - final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(); - configuration.setNumWorkerThreads(numWorkerThreads); - configuration.buildConfiguration(); + final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads); Analysis analysis = new Analysis(configuration); analysis.init(); @@ -93,11 +90,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { this.stopWatch.end(); } - int maxNumWaits = 0; - for (SpScPipe pipe : configuration.getTcpRelayPipes()) { - maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); - } - System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); + System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits()); // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 3082beaf4fc092e57df7728ec4c9c55ab0fce79f..b76f39fca0a7808c40da88367fe71e20225fcd5a 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -28,7 +28,6 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import teetime.framework.Analysis; -import teetime.framework.pipe.SpScPipe; import teetime.util.ListUtil; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -87,11 +86,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // Duration: 22373 ms void performAnalysis(final int numWorkerThreads) { - final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(); - configuration.setNumWorkerThreads(numWorkerThreads); - configuration.buildConfiguration(); + final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads); - Analysis analysis = new Analysis(configuration); + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -101,11 +98,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { this.stopWatch.end(); } - int maxNumWaits = 0; - for (SpScPipe pipe : configuration.getTcpRelayPipes()) { - maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); - } - System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); + System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits()); // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 15c606b430db830a75fbb960537bc8521a7ab280..055a7895347399329312cfbced4f20cafd615817 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -9,7 +9,10 @@ import java.util.List; import teetime.framework.AbstractStage; import teetime.framework.AnalysisConfiguration; import teetime.framework.IStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; import teetime.stage.Counter; @@ -17,6 +20,7 @@ import teetime.stage.ElementDelayMeasuringStage; import teetime.stage.ElementThroughputMeasuringStage; import teetime.stage.InstanceCounter; import teetime.stage.InstanceOfFilter; +import teetime.stage.Pipeline; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; @@ -38,7 +42,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); - private int numWorkerThreads; + private final int numWorkerThreads; private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); @@ -50,11 +54,14 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; - private final List<SpScPipe> tcpRelayPipes = new LinkedList<SpScPipe>(); + private final List<IPipe> tcpRelayPipes = new LinkedList<IPipe>(); + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; @SuppressWarnings({ "rawtypes", "unchecked" }) - public TcpTraceReconstructionAnalysisWithThreadsConfiguration() { + public TcpTraceReconstructionAnalysisWithThreadsConfiguration(final int numWorkerThreads) { super(); + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, numWorkerThreads); try { this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); @@ -69,43 +76,46 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } catch (SecurityException e) { throw new IllegalArgumentException(e); } + + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); } - public void buildConfiguration() { - final IStage tcpPipeline = this.buildTcpPipeline(); - this.getFiniteProducerStages().add(tcpPipeline); + private void init() { + Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + addThreadableStage(tcpPipeline); - final IStage clockStage = this.buildClockPipeline(1000); - this.getInfiniteProducerStages().add(clockStage); + Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); + addThreadableStage(clockStage); - final IStage clock2Stage = this.buildClockPipeline(2000); - this.getInfiniteProducerStages().add(clock2Stage); + Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + addThreadableStage(clock2Stage); - this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); for (int i = 0; i < this.numWorkerThreads; i++) { IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); - this.getConsumerStages().add(pipeline); + addThreadableStage(pipeline); } } - private IStage buildTcpPipeline() { + private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); - SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort()); - return tcpReader; + return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor); } - private IStage buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); - SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort()); - return clock; + return new Pipeline<Distributor<Long>>(clock, distributor); } private static class StageFactory<T extends AbstractStage> { @@ -155,22 +165,22 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages - SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); - SpScPipe.connect(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + interThreadPipeFactory.create(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); - SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort()); - SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); - SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); - SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); - // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); - SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort()); + intraThreadPipeFactory.create(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); + intraThreadPipeFactory.create(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); + intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); + intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); + // intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); + intraThreadPipeFactory.create(traceCounter.getOutputPort(), endStage.getInputPort()); return relay; } @@ -196,7 +206,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } public List<Long> getRecordDelays() { - List<Long> throughputs = new LinkedList<Long>(); + final List<Long> throughputs = new LinkedList<Long>(); for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordDelayFilterFactory.getStages()) { throughputs.addAll(stage.getDelays()); } @@ -227,18 +237,19 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal return numTraceMetadatas; } - public List<SpScPipe> getTcpRelayPipes() { - return this.tcpRelayPipes; + public int getMaxNumWaits() { + int maxNumWaits = 0; + for (IPipe pipe : this.tcpRelayPipes) { + SpScPipe interThreadPipe = (SpScPipe) pipe; + maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits()); + } + return maxNumWaits; } public int getNumWorkerThreads() { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - public int getMaxElementsCreated() { return this.traceId2trace.getMaxElements(); }