diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java index ad40462162d5ae4cb76f36bdb5b0603139afc065..80d8c33cbf5aa3b48fb7b95787857c854b961048 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java @@ -5,7 +5,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.IStage; import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; @@ -26,9 +25,12 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati private final IPipeFactory interThreadPipeFactory; public TcpTraceLoggingExtAnalysisConfiguration() { - intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); + } + private void init() { final Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); addThreadableStage(clockPipeline); final IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java index 1158dade907c11c392f4e6e30ff4cd16a3db55e7..825c2501605f8ed63a90de5e7f95d305fc43739e 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java @@ -30,6 +30,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import teetime.framework.Analysis; import teetime.util.ListUtil; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -60,7 +61,9 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest { @Test public void performAnalysis() { - final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis(); + final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration(); + + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -70,7 +73,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest { this.stopWatch.end(); } - List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs()); + List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs()); Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs); System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit"); @@ -78,8 +81,8 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest { // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); // System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); - assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); - assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); + assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords()); + assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces()); // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 8bc965b84c26c72a8074ab469ae9ea76ba4ea72d..7c7cc3b0b8276ccadb1df5c07fcaa944dd532a34 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -24,6 +24,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import teetime.framework.Analysis; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -53,7 +54,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { @Test public void performAnalysis() { - final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis(); + final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration(); + + Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -63,11 +66,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { this.stopWatch.end(); } - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getTraceThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); - assertEquals(EXPECTED_NUM_RECORDS, analysis.getNumRecords()); - assertEquals(EXPECTED_NUM_TRACES, analysis.getNumTraces()); + assertEquals(EXPECTED_NUM_RECORDS, configuration.getNumRecords()); + assertEquals(EXPECTED_NUM_TRACES, configuration.getNumTraces()); // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java similarity index 57% rename from src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java rename to src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java index abeb2fa9409d9bb7dc5fc7a630dccc1b56d7d4b4..9b2a5e47a899ac4b66fc770565cfcc60d6261fdf 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java @@ -3,14 +3,17 @@ package teetime.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; +import teetime.framework.AnalysisConfiguration; import teetime.framework.IStage; import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; import teetime.stage.InstanceOfFilter; +import teetime.stage.Pipeline; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; import teetime.stage.io.network.TcpReader; @@ -22,7 +25,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; -public class TcpTraceReconstructionAnalysis { +public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfiguration { private static final int MIO = 1000000; private static final int TCP_RELAY_MAX_SIZE = 2 * MIO; @@ -40,25 +43,34 @@ public class TcpTraceReconstructionAnalysis { private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; - public void init() { - IStage clockStage = this.buildClockPipeline(1000); + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + public TcpTraceReconstructionAnalysisConfiguration() { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); + } + + private void init() { + Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - IStage clock2Stage = this.buildClockPipeline(2000); + Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private IStage buildClockPipeline(final long intervalDelayInMs) { + private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); - SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort()); - return clock; + return new Pipeline<Distributor<Long>>(clock, distributor); } private IStage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { @@ -74,37 +86,22 @@ public class TcpTraceReconstructionAnalysis { Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); - SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - // SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); - - SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); - SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + interThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); + intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // intraThreadPipeFactory.create(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); + intraThreadPipeFactory.create(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); + intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), endStage.getInputPort()); + + interThreadPipeFactory.create(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); + interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); return tcpReader; } - public void start() { - - this.workerThread.start(); - // this.clockThread.start(); - this.clock2Thread.start(); - - try { - this.workerThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - this.clockThread.interrupt(); - this.clock2Thread.interrupt(); - } - public List<TraceEventRecords> getElementCollection() { return this.elementCollection; }