diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java index f525de1ef0d08464181f828ccfb06dc4e9c29003..ddba73bbf618c005384b861b23a50695e6ab2bd1 100644 --- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java +++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java @@ -19,11 +19,8 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.framework.Stage; import teetime.framework.AnalysisConfiguration; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.Stage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.className.ClassNameRegistryRepository; @@ -39,10 +36,8 @@ import kieker.common.record.IMonitoringRecord; public class RecordReaderConfiguration extends AnalysisConfiguration { private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); - private final IPipeFactory intraThreadPipeFactory; public RecordReaderConfiguration() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); this.buildConfiguration(); } @@ -60,8 +55,8 @@ public class RecordReaderConfiguration extends AnalysisConfiguration { CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); // connect stages - intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); - intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + connectIntraThreads(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); + connectIntraThreads(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); return initialElementProducer; } diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java index 6a4d000d9174e9cfc2a22aa683b2a295f05859ed..e7e43e1437776df5e7f687c2910de44f30050fd7 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java @@ -17,12 +17,9 @@ package teetime.examples.traceReading; import java.util.List; -import teetime.framework.Stage; import teetime.framework.AnalysisConfiguration; import teetime.framework.Pipeline; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.Stage; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; @@ -36,12 +33,8 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati private Counter<IMonitoringRecord> recordCounter; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; public TcpTraceLoggingExtAnalysisConfiguration() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); init(); } @@ -58,7 +51,7 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati clockStage.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); - intraThreadPipeFactory.create(clockStage.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(clockStage.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<Long>>(clockStage, distributor); } @@ -69,12 +62,12 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfigurati this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); - intraThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); - intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); - intraThreadPipeFactory.create(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); + connectIntraThreads(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); + connectIntraThreads(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); + connectIntraThreads(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); // intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), endStage.getInputPort()); - interThreadPipeFactory.create(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); + connectBoundedInterThreads(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); return tcpReader; } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java index e69864138246074d2ef680127ed16a44e37eb321..e0bf65d9f31525d8161e38dee74e7cdd348e1de4 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java @@ -20,9 +20,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.Pipeline; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; @@ -50,12 +47,7 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration { private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter; private ElementThroughputMeasuringStage<EventBasedTrace> traceThroughputFilter; - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; - public TcpTraceReconstructionConf() { - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE); init(); } @@ -76,7 +68,7 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration { clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); - intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(clock.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<Long>>(clock, distributor); } @@ -94,18 +86,18 @@ public class TcpTraceReconstructionConf extends AnalysisConfiguration { Sink<EventBasedTrace> endStage = new Sink<EventBasedTrace>(); // connect stages - interThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); - intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // intraThreadPipeFactory.create(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); - intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); - intraThreadPipeFactory.create(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - // intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); - intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), endStage.getInputPort()); - - interThreadPipeFactory.create(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); - interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); + connectBoundedInterThreads(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE); + connectIntraThreads(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // connectIntraThreads(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // connectIntraThreads(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); + connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort()); + connectIntraThreads(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort()); + connectIntraThreads(this.traceCounter.getOutputPort(), endStage.getInputPort()); + + connectBoundedInterThreads(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); + connectBoundedInterThreads(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); return tcpReader; } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java index 11d5c23ee5b8b7a677f800e81c5b714d8dd5eca0..3efaa7ef9ebf23865be651c9062a586813742590 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java @@ -21,9 +21,6 @@ import java.util.List; import teetime.framework.AnalysisConfiguration; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Cache; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -50,8 +47,6 @@ public class TraceReconstructionConf extends AnalysisConfiguration { private final List<EventBasedTrace> elementCollection = new LinkedList<EventBasedTrace>(); private final File inputDir; - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace; private ClassNameRegistryRepository classNameRegistryRepository; @@ -61,8 +56,6 @@ public class TraceReconstructionConf extends AnalysisConfiguration { public TraceReconstructionConf(final File inputDir) { this.inputDir = inputDir; - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE); init(); } @@ -105,20 +98,20 @@ public class TraceReconstructionConf extends AnalysisConfiguration { stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); // connect stages - intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); - intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); - intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), cache.getInputPort()); - intraThreadPipeFactory.create(cache.getOutputPort(), stringBufferFilter.getInputPort()); - intraThreadPipeFactory.create(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); - intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), this.throughputFilter.getInputPort()); - intraThreadPipeFactory.create(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); - intraThreadPipeFactory.create(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); - intraThreadPipeFactory.create(merger.getOutputPort(), this.traceCounter.getInputPort()); - intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), collector.getInputPort()); - - interThreadPipeFactory.create(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); + connectIntraThreads(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); + connectIntraThreads(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); + connectIntraThreads(this.recordCounter.getOutputPort(), cache.getInputPort()); + connectIntraThreads(cache.getOutputPort(), stringBufferFilter.getInputPort()); + connectIntraThreads(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); + connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), this.throughputFilter.getInputPort()); + connectIntraThreads(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // connectIntraThreads(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); + connectIntraThreads(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); + connectIntraThreads(merger.getOutputPort(), this.traceCounter.getInputPort()); + connectIntraThreads(this.traceCounter.getOutputPort(), collector.getInputPort()); + + connectBoundedInterThreads(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); return initialElementProducer; } diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 787169d6e46b8274464db404c99a81b07a5d4d1a..abcdbf50771db959d10e2c8e40d6e9866ef3a559 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -27,9 +27,6 @@ import teetime.framework.Pipeline; import teetime.framework.Stage; import teetime.framework.pipe.IMonitorablePipe; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementDelayMeasuringStage; @@ -71,8 +68,6 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal private final StageFactory<ElementThroughputMeasuringStage<EventBasedTrace>> traceThroughputFilterFactory; private final List<IMonitorablePipe> tcpRelayPipes = new LinkedList<IMonitorablePipe>(); - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; @SuppressWarnings({ "rawtypes", "unchecked" }) public TcpTraceReconstructionAnalysisWithThreadsConfiguration(final int numWorkerThreads) { @@ -93,8 +88,6 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal throw new IllegalArgumentException(e); } - intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); init(); } @@ -118,7 +111,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal TcpReaderStage tcpReader = new TcpReaderStage(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); - intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(tcpReader.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor); } @@ -129,7 +122,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); - intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(clock.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<Long>>(clock, distributor); } @@ -182,22 +175,22 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages - IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + IPipe tcpRelayPipe = connectBoundedInterThreads(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add((IMonitorablePipe) tcpRelayPipe); // SysOutFilter<EventBasedTrace> sysout = new SysOutFilter<EventBasedTrace>(tcpRelayPipe); - interThreadPipeFactory.create(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); - interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); - - intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort()); - intraThreadPipeFactory.create(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); - intraThreadPipeFactory.create(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); - intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); - intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); - intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); - // intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); - intraThreadPipeFactory.create(traceCounter.getOutputPort(), endStage.getInputPort()); + connectBoundedInterThreads(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10); + connectBoundedInterThreads(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + + connectIntraThreads(relay.getOutputPort(), recordCounter.getInputPort()); + connectIntraThreads(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort()); + connectIntraThreads(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort()); + connectIntraThreads(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); + connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); + connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort()); + // connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // connectIntraThreads(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); + connectIntraThreads(traceCounter.getOutputPort(), endStage.getInputPort()); return relay; } diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java index 8c03d88b7fe1422ed58e5e3690081ea8c96d2857..cec083fd310ce47320a429283b0beb174517aca4 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java @@ -29,9 +29,6 @@ import teetime.framework.Pipeline; import teetime.framework.Stage; import teetime.framework.pipe.IMonitorablePipe; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementDelayMeasuringStage; @@ -67,8 +64,6 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace; private final Map<EventBasedTrace, TraceAggregationBuffer> trace2buffer; - private final IPipeFactory intraThreadPipeFactory; - private final IPipeFactory interThreadPipeFactory; @SuppressWarnings({ "rawtypes", "unchecked" }) public TcpTraceReductionAnalysisWithThreadsConfiguration(final int numWorkerThreads) { @@ -89,8 +84,6 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC this.traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE); this.trace2buffer = new TreeMap<EventBasedTrace, TraceAggregationBuffer>(new EventBasedTraceComperator()); - this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - this.interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); init(); } @@ -114,7 +107,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC final TcpReaderStage tcpReader = new TcpReaderStage(); final Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); - intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(tcpReader.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor); } @@ -125,7 +118,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC clock.setIntervalDelayInMs(intervalDelayInMs); final Distributor<Long> distributor = new Distributor<Long>(); - intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort()); + connectIntraThreads(clock.getOutputPort(), distributor.getInputPort()); return new Pipeline<Distributor<Long>>(clock, distributor); } @@ -182,23 +175,23 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends AnalysisC Sink<TraceAggregationBuffer> endStage = new Sink<TraceAggregationBuffer>(); // connect stages - final IPipe pipe = interThreadPipeFactory.create(tcpPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + final IPipe pipe = connectBoundedInterThreads(tcpPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add((IMonitorablePipe) pipe); - intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort()); - intraThreadPipeFactory.create(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); - intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); - intraThreadPipeFactory.create(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); - intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort()); - intraThreadPipeFactory.create(traceReductionFilter.getOutputPort(), traceCounter.getInputPort()); - intraThreadPipeFactory.create(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort()); - intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + connectIntraThreads(relay.getOutputPort(), recordCounter.getInputPort()); + connectIntraThreads(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort()); + connectIntraThreads(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort()); + connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), traceReconstructionFilter.getInputPort()); + connectIntraThreads(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort()); + connectIntraThreads(traceReductionFilter.getOutputPort(), traceCounter.getInputPort()); + connectIntraThreads(traceCounter.getOutputPort(), traceThroughputFilter.getInputPort()); + connectIntraThreads(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); - // intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); - // intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); + // connectIntraThreads(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort()); + // connectIntraThreads(traceThroughputFilter.getOutputPort(), endStage.getInputPort()); - interThreadPipeFactory.create(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); - interThreadPipeFactory.create(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); + connectBoundedInterThreads(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); + connectBoundedInterThreads(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); return relay; }