diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java index 3cd2b2072416365c3aaf681570eb6a4b8549c6d5..f50b52208325806eafa34f92f4d230ebd6f1e173 100644 --- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java +++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java @@ -43,7 +43,7 @@ public class RecordReaderConfiguration extends Configuration { private void buildConfiguration() { final Stage producerPipeline = this.buildProducerPipeline(); - addThreadableStage(producerPipeline); + declareActive(producerPipeline); } private Stage buildProducerPipeline() { diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java index 5b8c52a0f239212cf69e7806d473115e78ff9af9..1e145c27f2d2a16cb0c28630437dc5d0a48efab0 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java @@ -40,9 +40,9 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends Configuration { private void init() { final Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); - addThreadableStage(clockPipeline.getFirstStage()); + declareActive(clockPipeline.getFirstStage()); final Stage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); - addThreadableStage(tcpPipeline); + declareActive(tcpPipeline); } private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java index b5e54e10ae5c2c5496b0143b2ca3b3a686905c91..c05eb5a65729e20d4d07c2612b7ea6ae8d0376f7 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java @@ -54,13 +54,13 @@ public class TcpTraceReconstructionConf extends Configuration { private void init() { Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); - addThreadableStage(clockStage.getFirstStage()); + declareActive(clockStage.getFirstStage()); Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); - addThreadableStage(clock2Stage.getFirstStage()); + declareActive(clock2Stage.getFirstStage()); Stage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); - addThreadableStage(pipeline); + declareActive(pipeline); } private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { @@ -88,12 +88,12 @@ public class TcpTraceReconstructionConf extends Configuration { // connect stages connectPorts(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); connectPorts(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // connectIntraThreads(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // connectIntraThreads(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // connectPorts(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // connectPorts(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); connectPorts(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); connectPorts(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); connectPorts(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - // connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); + // connectPorts(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); connectPorts(this.traceCounter.getOutputPort(), endStage.getInputPort()); connectPorts(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java index 9210ee16c354490db202c54bfc73a2125ce27fba..0afcb0504113fdb7cd703cb0428844697b55890c 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java @@ -62,10 +62,10 @@ public class TraceReconstructionConf extends Configuration { private void init() { Clock clockStage = this.buildClockPipeline(); - addThreadableStage(clockStage); + declareActive(clockStage); Stage pipeline = this.buildPipeline(clockStage); - addThreadableStage(pipeline); + declareActive(pipeline); } private Clock buildClockPipeline() { @@ -105,7 +105,7 @@ public class TraceReconstructionConf extends Configuration { connectPorts(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); connectPorts(instanceOfFilter.getMatchedOutputPort(), this.throughputFilter.getInputPort()); connectPorts(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // connectIntraThreads(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // connectPorts(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); connectPorts(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); connectPorts(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); connectPorts(merger.getOutputPort(), this.traceCounter.getInputPort()); diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index b7ca516ce5358d383dc223a81617117afc700591..6484124baa62987c0d606cdca3ad0510fb571a2f 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -92,17 +92,17 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf private void init() { Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - addThreadableStage(tcpPipeline.getFirstStage()); + declareActive(tcpPipeline.getFirstStage()); Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); - addThreadableStage(clockStage.getFirstStage()); + declareActive(clockStage.getFirstStage()); Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); - addThreadableStage(clock2Stage.getFirstStage()); + declareActive(clock2Stage.getFirstStage()); for (int i = 0; i < this.numWorkerThreads; i++) { Stage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); - addThreadableStage(pipeline); + declareActive(pipeline); } } @@ -187,8 +187,8 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf connectPorts(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); connectPorts(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); connectPorts(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); - // connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // connectIntraThreads(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); + // connectPorts(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // connectPorts(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); connectPorts(traceCounter.getOutputPort(), endStage.getInputPort()); return relay; diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java index 1f4a36bbd80280b948d2ad48da46e765ea34dbb7..ebea3fcdabeb5be84c6c2dc468a4064e639663a4 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java @@ -88,17 +88,17 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends Configura private void init() { final Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - addThreadableStage(tcpPipeline.getFirstStage()); + declareActive(tcpPipeline.getFirstStage()); final Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000); - addThreadableStage(clockStage.getFirstStage()); + declareActive(clockStage.getFirstStage()); final Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); - addThreadableStage(clock2Stage.getFirstStage()); + declareActive(clock2Stage.getFirstStage()); for (int i = 0; i < this.numWorkerThreads; i++) { final Stage pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - addThreadableStage(pipeline); + declareActive(pipeline); } } @@ -186,8 +186,8 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends Configura connectPorts(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort()); connectPorts(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); - // connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // connectIntraThreads(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + // connectPorts(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // connectPorts(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); connectPorts(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); connectPorts(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); diff --git a/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java b/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java index b9191f99d743c9804693d6556f018f5deacb3687..dc3f1d3242f1b3dd6159d5348f5b11172dc2f409 100644 --- a/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java +++ b/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java @@ -52,7 +52,7 @@ public class Dir2RecordsFilterTest { this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository()); connectPorts(this.producer.getOutputPort(), this.reader.getInputPort()); - addThreadableStage(producer); + declareActive(producer); } public OutputPort<IMonitoringRecord> getOutputPort() {