From 66255150004635b137ea9053976eb084fd3c610f Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 4 Nov 2014 15:53:19 +0100 Subject: [PATCH] renamed deprecated stages --- .settings/edu.umd.cs.findbugs.core.prefs | 2 +- src/main/java/util/KiekerLoadDriver.java | 6 ++-- .../kiekerdays/TcpTraceLoggingExplorviz.java | 4 +-- .../kiekerdays/TcpTraceReconstruction.java | 10 +++---- .../kiekerdays/TcpTraceReduction.java | 16 +++++------ .../TcpTraceLoggingExtAnalysis.java | 14 +++++----- .../TcpTraceReconstructionAnalysis.java | 16 +++++------ .../TraceReconstructionAnalysis.java | 8 +++--- ...ctionAnalysisWithThreadsConfiguration.java | 22 +++++++-------- .../TcpTraceReductionAnalysisWithThreads.java | 28 +++++++++---------- 10 files changed, 63 insertions(+), 63 deletions(-) diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index c2826c29..cc4d4561 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Thu Oct 23 07:49:05 CEST 2014 +#Tue Nov 04 15:50:16 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java index a7a5a238..3fa220d1 100644 --- a/src/main/java/util/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -14,7 +14,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.HeadStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; @@ -40,14 +40,14 @@ public class KiekerLoadDriver { this.runnableStage = new RunnableStage(producerPipeline); } - private HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { + private OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>(); + final OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>(); pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java index 9f3eab50..f97afc82 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -1,6 +1,6 @@ package teetime.examples.kiekerdays; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.HeadStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; @@ -36,7 +36,7 @@ public class TcpTraceLoggingExplorviz { SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); + OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return tcpReader; diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index db4e87f8..46b62adc 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -4,7 +4,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.HeadStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; @@ -38,7 +38,7 @@ public class TcpTraceReconstruction { private int numWorkerThreads; public void init() { - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -50,14 +50,14 @@ public class TcpTraceReconstruction { } } - private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; @@ -80,7 +80,7 @@ public class TcpTraceReconstruction { SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); // create and configure pipeline - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.setLastStage(endStage); return pipeline; diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java index d50830d1..e8649f66 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java @@ -6,7 +6,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.HeadStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; @@ -46,10 +46,10 @@ public class TcpTraceReduction { private int numWorkerThreads; public void init() { - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); + OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); this.clockThread = new Thread(new RunnableStage(clockStage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -61,20 +61,20 @@ public class TcpTraceReduction { } } - private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -83,7 +83,7 @@ public class TcpTraceReduction { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); + OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -110,7 +110,7 @@ public class TcpTraceReduction { SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.setLastStage(endStage); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 8808f3ee..d4b06908 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -2,7 +2,7 @@ package teetime.examples.traceReading; import java.util.List; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -23,7 +23,7 @@ public class TcpTraceLoggingExtAnalysis { private Counter<IMonitoringRecord> recordCounter; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; - private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clockStage = new Clock(); clockStage.setInitialDelayInMs(intervalDelayInMs); clockStage.setIntervalDelayInMs(intervalDelayInMs); @@ -32,13 +32,13 @@ public class TcpTraceLoggingExtAnalysis { SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); + OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clockStage); pipeline.setLastStage(distributor); return pipeline; } - private HeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) { + private OldHeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) { TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); @@ -52,7 +52,7 @@ public class TcpTraceLoggingExtAnalysis { SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Sink<IMonitoringRecord>>(); + OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return pipeline; @@ -60,10 +60,10 @@ public class TcpTraceLoggingExtAnalysis { public void init() { - HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); + OldHeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockPipeline)); - HeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); + OldHeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index b2954cf4..9a4cfff8 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -3,7 +3,7 @@ package teetime.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -41,17 +41,17 @@ public class TcpTraceReconstructionAnalysis { private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; public void init() { - HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); + OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); @@ -59,13 +59,13 @@ public class TcpTraceReconstructionAnalysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); + OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; } - private HeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); @@ -92,7 +92,7 @@ public class TcpTraceReconstructionAnalysis { SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TcpReader, Sink<TraceEventRecords>>(); + OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<TcpReader, Sink<TraceEventRecords>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java index f1246f5a..1c171f1f 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -4,7 +4,7 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -49,7 +49,7 @@ public class TraceReconstructionAnalysis { Clock clockStage = this.buildClockPipeline(); this.clockThread = new Thread(new RunnableStage(clockStage)); - HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage); + OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage); this.workerThread = new Thread(new RunnableStage(pipeline)); } @@ -60,7 +60,7 @@ public class TraceReconstructionAnalysis { return clock; } - private HeadPipeline<?, ?> buildPipeline(final Clock clockStage) { + private OldHeadPipeline<?, ?> buildPipeline(final Clock clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages @@ -99,7 +99,7 @@ public class TraceReconstructionAnalysis { SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); // create and configure pipeline - HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>(); + OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>(); pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 660e8239..0169a2ce 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -7,7 +7,7 @@ import java.util.LinkedList; import java.util.List; import teetime.framework.AnalysisConfiguration; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.Stage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -72,37 +72,37 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } public void buildConfiguration() { - final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.getFiniteProducerStages().add(tcpPipeline); - final HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + final OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.getInfiniteProducerStages().add(clockStage); - final HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); this.getInfiniteProducerStages().add(clock2Stage); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); for (int i = 0; i < this.numWorkerThreads; i++) { - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); this.getConsumerStages().add(pipeline); } } - private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -111,7 +111,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); + OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -147,7 +147,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } } - private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, + private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); @@ -183,7 +183,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>( + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>( "Worker pipeline"); pipeline.setFirstStage(relay); // pipeline.addIntermediateStage(sysout); diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index e74fba89..daef09de 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -8,7 +8,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import teetime.framework.HeadPipeline; +import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; import teetime.framework.Stage; import teetime.framework.pipe.SingleElementPipe; @@ -52,38 +52,38 @@ public class TcpTraceReductionAnalysisWithThreads { private int numWorkerThreads; public void init() { - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); + OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); + OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -92,7 +92,7 @@ public class TcpTraceReductionAnalysisWithThreads { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); + OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -152,10 +152,10 @@ public class TcpTraceReductionAnalysisWithThreads { } } - private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline( - final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, - final HeadPipeline<Clock, Distributor<Long>> clockStage, - final HeadPipeline<Clock, Distributor<Long>> clock2Stage) { + private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline( + final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, + final OldHeadPipeline<Clock, Distributor<Long>> clockStage, + final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -188,7 +188,7 @@ public class TcpTraceReductionAnalysisWithThreads { SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.setLastStage(endStage); return pipeline; -- GitLab