diff --git a/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java b/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java index ee175d08512ce0685bf25a53d3740b775ddf1864..e8c3a29b251eacb7d64da5f63f882342abfe0b5e 100644 --- a/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java +++ b/src/main/java/teetime/stage/className/ClassNameRegistryCreationFilter.java @@ -19,15 +19,15 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; /** * @author Christian Wulf - * + * * @since 1.10 */ -public class ClassNameRegistryCreationFilter extends ConsumerStage<File> { +public class ClassNameRegistryCreationFilter extends AbstractConsumerStage<File> { private final OutputPort<File> outputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java index cd0aa5162e8f70ae2987b029fd3aac244fe35dd6..8fc450e755952b733cdfb663226189815f184393 100644 --- a/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java +++ b/src/main/java/teetime/stage/explorviz/KiekerRecordTcpReader.java @@ -9,7 +9,7 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; -import teetime.framework.ProducerStage; +import teetime.framework.AbstractProducerStage; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.trace.operation.AfterOperationEvent; @@ -17,7 +17,7 @@ import kieker.common.record.flow.trace.operation.BeforeOperationEvent; import kieker.common.util.registry.ILookup; import kieker.common.util.registry.Lookup; -public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> { +public class KiekerRecordTcpReader extends AbstractProducerStage<IMonitoringRecord> { private static final int MESSAGE_BUFFER_SIZE = 65535; diff --git a/src/main/java/teetime/stage/io/database/DbReader.java b/src/main/java/teetime/stage/io/database/DbReader.java index 076acb3d322d4c1d9aa64c2e4a921641c6a8e76a..3c3d6648c0615bf0c6945cb99b414b71e56539eb 100644 --- a/src/main/java/teetime/stage/io/database/DbReader.java +++ b/src/main/java/teetime/stage/io/database/DbReader.java @@ -22,7 +22,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import teetime.framework.ProducerStage; +import teetime.framework.AbstractProducerStage; import kieker.common.exception.MonitoringRecordException; import kieker.common.record.AbstractMonitoringRecord; @@ -36,7 +36,7 @@ import kieker.common.record.IMonitoringRecord; * @since 1.10 */ // @Description("A reader which reads records from a database") -public class DbReader extends ProducerStage<IMonitoringRecord> { +public class DbReader extends AbstractProducerStage<IMonitoringRecord> { /** * The classname of the driver used for the connection. diff --git a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java index d5749921d2d15bab2ec9548abbfac6bb8fae7d43..899ff1b6df4a746a0cc442c272b7d7d221d6d37a 100644 --- a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java @@ -17,8 +17,8 @@ package teetime.stage.io.filesystem; import java.io.File; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; -import teetime.framework.OldPipeline; import teetime.framework.OutputPort; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; @@ -41,10 +41,12 @@ import kieker.common.util.filesystem.FSUtil; * * @since 1.10 */ -public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { +public class Dir2RecordsFilter extends AbstractStage { private final PipeFactoryRegistry pipeFactoryRegistry = PipeFactoryRegistry.INSTANCE; private ClassNameRegistryRepository classNameRegistryRepository; + private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter; + private final Merger<IMonitoringRecord> recordMerger; /** * @since 1.10 @@ -80,8 +82,8 @@ public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilt pipeFactory.create(binaryFile2RecordFilter.getOutputPort(), recordMerger.getNewInputPort()); // prepare pipeline - this.setFirstStage(classNameRegistryCreationFilter); - this.setLastStage(recordMerger); + this.classNameRegistryCreationFilter = classNameRegistryCreationFilter; + this.recordMerger = recordMerger; } /** @@ -100,11 +102,16 @@ public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilt } public InputPort<File> getInputPort() { - return this.getFirstStage().getInputPort(); + return classNameRegistryCreationFilter.getInputPort(); } public OutputPort<IMonitoringRecord> getOutputPort() { - return this.getLastStage().getOutputPort(); + return recordMerger.getOutputPort(); + } + + @Override + public void executeWithPorts() { + classNameRegistryCreationFilter.executeWithPorts(); } } diff --git a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java index feb0cb4f9f047aeeddda9ff7d4b3c4ee3d9f7145..341d316d565902bcdffbfcd8b69be515e3729523 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java @@ -2,8 +2,8 @@ package teetime.stage.io.filesystem.format.binary; import java.io.File; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; -import teetime.framework.OldPipeline; import teetime.framework.OutputPort; import teetime.stage.className.ClassNameRegistryCreationFilter; import teetime.stage.className.ClassNameRegistryRepository; @@ -11,18 +11,17 @@ import teetime.stage.io.filesystem.format.binary.file.BinaryFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { +public class DirWithBin2RecordFilter extends AbstractStage { private ClassNameRegistryRepository classNameRegistryRepository; + private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter; + private final BinaryFile2RecordFilter binaryFile2RecordFilter; public DirWithBin2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { this.classNameRegistryRepository = classNameRegistryRepository; - final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository); - final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository); - - this.setFirstStage(classNameRegistryCreationFilter); - this.setLastStage(binaryFile2RecordFilter); + classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository); + binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository); } public DirWithBin2RecordFilter() { @@ -38,10 +37,15 @@ public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreati } public InputPort<File> getInputPort() { - return this.getFirstStage().getInputPort(); + return this.classNameRegistryCreationFilter.getInputPort(); } public OutputPort<IMonitoringRecord> getOutputPort() { - return this.getLastStage().getOutputPort(); + return this.binaryFile2RecordFilter.getOutputPort(); + } + + @Override + public void executeWithPorts() { + classNameRegistryCreationFilter.executeWithPorts(); } } diff --git a/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java index 24af3af29d8d439113bb75b250283f02f2afb593..7967005050b7dc0a58172a5c90e90f51e0aa93e3 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/binary/file/BinaryFile2RecordFilter.java @@ -19,7 +19,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; import teetime.stage.className.ClassNameRegistryRepository; @@ -32,7 +32,7 @@ import kieker.common.util.filesystem.BinaryCompressionMethod; * * @since 1.10 */ -public class BinaryFile2RecordFilter extends ConsumerStage<File> { +public class BinaryFile2RecordFilter extends AbstractConsumerStage<File> { private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java index 1db084810d3f10f804cb77e11e7597da878ae977..9c45050682515243c0dab3b88d1440399edc47e3 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/text/DirWithDat2RecordFilter.java @@ -2,8 +2,8 @@ package teetime.stage.io.filesystem.format.text; import java.io.File; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; -import teetime.framework.OldPipeline; import teetime.framework.OutputPort; import teetime.stage.className.ClassNameRegistryCreationFilter; import teetime.stage.className.ClassNameRegistryRepository; @@ -11,18 +11,17 @@ import teetime.stage.io.filesystem.format.text.file.DatFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { +public class DirWithDat2RecordFilter extends AbstractStage { private ClassNameRegistryRepository classNameRegistryRepository; + private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter; + private final DatFile2RecordFilter datFile2RecordFilter; public DirWithDat2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { this.classNameRegistryRepository = classNameRegistryRepository; - final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository); - final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository); - - this.setFirstStage(classNameRegistryCreationFilter); - this.setLastStage(datFile2RecordFilter); + classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(classNameRegistryRepository); + datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository); } public DirWithDat2RecordFilter() { @@ -38,10 +37,16 @@ public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreati } public InputPort<File> getInputPort() { - return this.getFirstStage().getInputPort(); + return this.classNameRegistryCreationFilter.getInputPort(); } public OutputPort<IMonitoringRecord> getOutputPort() { - return this.getLastStage().getOutputPort(); + return this.datFile2RecordFilter.getOutputPort(); + } + + @Override + public void executeWithPorts() { + this.classNameRegistryCreationFilter.executeWithPorts(); } + } diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java index fba4593100a19579b5632b4fce6deeb92812fd75..c2ac888a9799473ad406eeed34d217e8d223221e 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java @@ -17,9 +17,9 @@ package teetime.stage.io.filesystem.format.text.file; import java.io.File; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.OldPipeline; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.File2TextLinesFilter; @@ -31,24 +31,30 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> { +public class DatFile2RecordFilter extends AbstractStage { - public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { - File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); - TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository); + private final File2TextLinesFilter file2TextLinesFilter; + private final TextLine2RecordFilter textLine2RecordFilter; - this.setFirstStage(file2TextLinesFilter); - this.setLastStage(textLine2RecordFilter); + public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { + file2TextLinesFilter = new File2TextLinesFilter(); + textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository); // BETTER let the framework choose the optimal pipe implementation SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort()); } public InputPort<File> getInputPort() { - return this.getFirstStage().getInputPort(); + return this.file2TextLinesFilter.getInputPort(); } public OutputPort<IMonitoringRecord> getOutputPort() { - return this.getLastStage().getOutputPort(); + return this.textLine2RecordFilter.getOutputPort(); } + + @Override + public void executeWithPorts() { + file2TextLinesFilter.executeWithPorts(); + } + } diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java index e2ee19c803d2c8dded445e421d90323d9a28cdbc..bb128cd6d00627099b6aa590b8d2e2b757e5c71e 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2MappingRegistryFilter.java @@ -18,16 +18,16 @@ package teetime.stage.io.filesystem.format.text.file; import java.util.Map; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import kieker.common.util.filesystem.FSUtil; /** * @author Christian Wulf - * + * * @since 1.10 */ -public class TextLine2MappingRegistryFilter extends ConsumerStage<String> { +public class TextLine2MappingRegistryFilter extends AbstractConsumerStage<String> { private final Map<Integer, String> stringRegistry; diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java index a39985f037404947d6a60b879f189608a9299b62..a165b77a1fae7f4f29f8258375f6eb1ede21d36a 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/TextLine2RecordFilter.java @@ -19,7 +19,7 @@ package teetime.stage.io.filesystem.format.text.file; import java.util.HashSet; import java.util.Set; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.util.MappingException; @@ -35,7 +35,7 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class TextLine2RecordFilter extends ConsumerStage<TextLine> { +public class TextLine2RecordFilter extends AbstractConsumerStage<TextLine> { private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 6e2d9ac4e846798b594fad5b84ae76f5dc257ef4..73a8e99c03d1e978a1a76df9851b379304872d70 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -18,7 +18,7 @@ package teetime.stage.stringBuffer; import java.util.Collection; import java.util.LinkedList; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; import teetime.stage.stringBuffer.util.KiekerHashMap; @@ -28,7 +28,7 @@ import teetime.stage.stringBuffer.util.KiekerHashMap; * * @since 1.10 */ -public class StringBufferFilter<T> extends ConsumerStage<T> { +public class StringBufferFilter<T> extends AbstractConsumerStage<T> { private final OutputPort<T> outputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java index 14ae83e98d301bf2526f4943580f4bfea0b4d9f6..3652abe7b2c6f9c0ca82bf0a426c0c02d9e852ea 100644 --- a/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/stage/trace/traceReconstruction/TraceReconstructionFilter.java @@ -17,7 +17,7 @@ package teetime.stage.trace.traceReconstruction; import java.util.concurrent.TimeUnit; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -32,7 +32,7 @@ import kieker.common.record.flow.trace.TraceMetadata; * * @since 1.10 */ -public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { +public class TraceReconstructionFilter extends AbstractConsumerStage<IFlowRecord> { private final OutputPort<TraceEventRecords> traceValidOutputPort = this.createOutputPort(); private final OutputPort<TraceEventRecords> traceInvalidOutputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java index 5c64c64f6499cc8c15e5c0748ebb7deec8e03acb..3f5dda019ed7819069ec525dbeb14774227f998b 100644 --- a/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/trace/traceReduction/TraceReductionFilter.java @@ -20,7 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import teetime.framework.ConsumerStage; +import teetime.framework.AbstractConsumerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -37,7 +37,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; * * @since */ -public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { +public class TraceReductionFilter extends AbstractConsumerStage<TraceEventRecords> { private final InputPort<Long> triggerInputPort = this.createInputPort(); private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort(); diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java index 3fa220d19997be63b189e0bf25ff8eb2a15add35..409bb4bd37aae5f09a06fbdd46ae1232019887b5 100644 --- a/src/main/java/util/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -14,8 +14,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.CollectorSink; @@ -36,25 +35,21 @@ public class KiekerLoadDriver { private long[] timings; public KiekerLoadDriver(final File directory) { - HeadStage producerPipeline = this.buildProducerPipeline(directory); + IStage producerPipeline = this.buildProducerPipeline(directory); this.runnableStage = new RunnableStage(producerPipeline); } - private OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { + private IStage buildProducerPipeline(final File directory) { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>(); - pipeline.setFirstStage(initialElementProducer); - pipeline.setLastStage(collector); - SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); - return pipeline; + return initialElementProducer; } public Collection<IMonitoringRecord> load() { diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java index 3820209e4a7a54f5f816b44ce1e9fb9cce6f0049..2bd12c2b82e9a3e93275f3e930df789625ac5a51 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java @@ -1,6 +1,6 @@ package teetime.examples.kiekerdays; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.stage.io.network.TcpReader; @@ -9,7 +9,7 @@ public class TcpTraceLogging { private Thread tcpThread; public void init() { - HeadStage tcpPipeline = this.buildTcpPipeline(); + IStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } @@ -24,7 +24,7 @@ public class TcpTraceLogging { } } - private HeadStage buildTcpPipeline() { + private IStage buildTcpPipeline() { // TCPReaderSink tcpReader = new TCPReaderSink(); TcpReader tcpReader = new TcpReader(); diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java index f97afc82e5e58ab5aa51826a39d339f63143bd58..3c9ddeaf9e9079b86cec6d89d85d403b4e957aec 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -1,7 +1,6 @@ package teetime.examples.kiekerdays; -import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.basic.Sink; @@ -14,7 +13,7 @@ public class TcpTraceLoggingExplorviz { private Thread tcpThread; public void init() { - HeadStage tcpPipeline = this.buildTcpPipeline(); + IStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } @@ -29,16 +28,12 @@ public class TcpTraceLoggingExplorviz { } } - private HeadStage buildTcpPipeline() { + private IStage buildTcpPipeline() { KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader(); Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); - // create and configure pipeline - OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(endStage); return tcpReader; } diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index 46b62adcf28fe567ec117546cfacf2946dd6b00e..cf8bf96fef4b31ee5f190e2e4df0b2a9df96898b 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -4,8 +4,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -38,32 +37,28 @@ public class TcpTraceReconstruction { private int numWorkerThreads; public void init() { - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + IStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - HeadStage pipeline = this.buildPipeline(tcpPipeline.getLastStage()); + IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage()); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private IStage buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(distributor); - return pipeline; + return tcpReader; } - private HeadStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) { + private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( @@ -79,11 +74,7 @@ public class TcpTraceReconstruction { SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); - pipeline.setFirstStage(relay); - pipeline.setLastStage(endStage); - return pipeline; + return relay; } public void start() { diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java index e8649f6624026454b633a61ab4e2d88817ea06f1..9e7c9e09e372dc56ad5f81d45c78a92d00a55990 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java @@ -6,8 +6,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import teetime.framework.OldHeadPipeline; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -46,35 +45,31 @@ public class TcpTraceReduction { private int numWorkerThreads; public void init() { - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + IStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); + IStage clockStage = this.buildClockPipeline(5000); this.clockThread = new Thread(new RunnableStage(clockStage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - HeadStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage()); + IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage()); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private IStage buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(distributor); - return pipeline; + return tcpReader; } - private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private IStage buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -82,14 +77,10 @@ public class TcpTraceReduction { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); - pipeline.setFirstStage(clock); - pipeline.setLastStage(distributor); - return pipeline; + return clock; } - private HeadStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) { + private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( @@ -109,11 +100,7 @@ public class TcpTraceReduction { SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); - // create and configure pipeline - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); - pipeline.setFirstStage(relay); - pipeline.setLastStage(endStage); - return pipeline; + return relay; } public void start() { diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java index 955057cbb4d3257ec8115247be1261573710bcdd..0f8000e89c1dc59ada01d189c098e65cbd8ff899 100644 --- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java +++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java @@ -20,7 +20,7 @@ import java.util.LinkedList; import java.util.List; import teetime.framework.AnalysisConfiguration; -import teetime.framework.HeadStage; +import teetime.framework.IStage; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; @@ -44,11 +44,11 @@ public class RecordReaderConfiguration extends AnalysisConfiguration { } private void buildConfiguration() { - HeadStage producerPipeline = this.buildProducerPipeline(); + IStage producerPipeline = this.buildProducerPipeline(); this.getFiniteProducerStages().add(producerPipeline); } - private HeadStage buildProducerPipeline() { + private IStage buildProducerPipeline() { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); File logDir = new File("src/test/data/bookstore-logs"); // create stages diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java index d4b0690871e9ee398f838dcac7adc8ceaf7066fc..6ab3f82485f504554ed06a01ab5ddc564b0e1988 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -2,7 +2,7 @@ package teetime.examples.traceReading; import java.util.List; -import teetime.framework.OldHeadPipeline; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -23,7 +23,7 @@ public class TcpTraceLoggingExtAnalysis { private Counter<IMonitoringRecord> recordCounter; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; - private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private IStage buildClockPipeline(final long intervalDelayInMs) { Clock clockStage = new Clock(); clockStage.setInitialDelayInMs(intervalDelayInMs); clockStage.setIntervalDelayInMs(intervalDelayInMs); @@ -31,14 +31,10 @@ public class TcpTraceLoggingExtAnalysis { SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); - pipeline.setFirstStage(clockStage); - pipeline.setLastStage(distributor); - return pipeline; + return clockStage; } - private OldHeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) { + private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) { TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); @@ -51,19 +47,14 @@ public class TcpTraceLoggingExtAnalysis { SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); - // create and configure pipeline - OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Sink<IMonitoringRecord>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(endStage); - return pipeline; + return tcpReader; } public void init() { - - OldHeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); + IStage clockPipeline = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockPipeline)); - OldHeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); + IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 9a4cfff81934ac20b84df6165bf6f83a6e4670ed..abeb2fa9409d9bb7dc5fc7a630dccc1b56d7d4b4 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -3,7 +3,7 @@ package teetime.examples.traceReconstruction; import java.util.LinkedList; import java.util.List; -import teetime.framework.OldHeadPipeline; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -41,31 +41,27 @@ public class TcpTraceReconstructionAnalysis { private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter; public void init() { - OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + IStage clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + IStage clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); + IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private IStage buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); - pipeline.setFirstStage(clock); - pipeline.setLastStage(distributor); - return pipeline; + return clock; } - private OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private IStage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); @@ -91,11 +87,7 @@ public class TcpTraceReconstructionAnalysis { SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); - // create and configure pipeline - OldHeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<TcpReader, Sink<TraceEventRecords>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(endStage); - return pipeline; + return tcpReader; } public void start() { diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java index 1c171f1f3e85a437bdacb737a986095de3ad1d1a..d7b33832a27b9453d03c8f973c16f65fccfe74c6 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -4,7 +4,7 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.framework.OldHeadPipeline; +import teetime.framework.IStage; import teetime.framework.RunnableStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -49,7 +49,7 @@ public class TraceReconstructionAnalysis { Clock clockStage = this.buildClockPipeline(); this.clockThread = new Thread(new RunnableStage(clockStage)); - OldHeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage); + IStage pipeline = this.buildPipeline(clockStage); this.workerThread = new Thread(new RunnableStage(pipeline)); } @@ -60,7 +60,7 @@ public class TraceReconstructionAnalysis { return clock; } - private OldHeadPipeline<?, ?> buildPipeline(final Clock clockStage) { + private IStage buildPipeline(final Clock clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages @@ -98,11 +98,7 @@ public class TraceReconstructionAnalysis { SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); - // create and configure pipeline - OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new OldHeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>(); - pipeline.setFirstStage(initialElementProducer); - pipeline.setLastStage(collector); - return pipeline; + return initialElementProducer; } public void start() { diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 0169a2ce1e661bb31b151bceee36fc5cf668008c..15c606b430db830a75fbb960537bc8521a7ab280 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -6,9 +6,9 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import teetime.framework.AbstractStage; import teetime.framework.AnalysisConfiguration; -import teetime.framework.OldHeadPipeline; -import teetime.framework.Stage; +import teetime.framework.IStage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; @@ -72,37 +72,32 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } public void buildConfiguration() { - final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + final IStage tcpPipeline = this.buildTcpPipeline(); this.getFiniteProducerStages().add(tcpPipeline); - final OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + final IStage clockStage = this.buildClockPipeline(1000); this.getInfiniteProducerStages().add(clockStage); - final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + final IStage clock2Stage = this.buildClockPipeline(2000); this.getInfiniteProducerStages().add(clock2Stage); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); for (int i = 0; i < this.numWorkerThreads; i++) { - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), - clock2Stage.getLastStage()); + IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); this.getConsumerStages().add(pipeline); } } - private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private IStage buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(distributor); - return pipeline; + return tcpReader; } - private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private IStage buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -110,14 +105,10 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); - pipeline.setFirstStage(clock); - pipeline.setLastStage(distributor); - return pipeline; + return clock; } - private static class StageFactory<T extends Stage> { + private static class StageFactory<T extends AbstractStage> { private final Constructor<T> constructor; private final List<T> stages = new ArrayList<T>(); @@ -147,8 +138,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } } - private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, - final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -182,14 +172,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal // SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort()); SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>( - "Worker pipeline"); - pipeline.setFirstStage(relay); - // pipeline.addIntermediateStage(sysout); - pipeline.setLastStage(endStage); - - return pipeline; + return relay; } public List<TraceEventRecords> getElementCollection() { diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index daef09de3b32c0b45b2fd6083dbdc6a1e4366261..3fad6c93ef2e0f2205be234455f2113a9212a5bc 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -8,9 +8,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import teetime.framework.OldHeadPipeline; +import teetime.framework.AbstractStage; +import teetime.framework.IStage; import teetime.framework.RunnableStage; -import teetime.framework.Stage; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; @@ -52,38 +52,34 @@ public class TcpTraceReductionAnalysisWithThreads { private int numWorkerThreads; public void init() { - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + IStage tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - OldHeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + IStage clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - OldHeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); + IStage clock2Stage = this.buildClockPipeline(5000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + IStage pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private IStage buildTcpPipeline() { TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); - pipeline.setFirstStage(tcpReader); - pipeline.setLastStage(distributor); - return pipeline; + return tcpReader; } - private OldHeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private IStage buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -91,14 +87,10 @@ public class TcpTraceReductionAnalysisWithThreads { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - // create and configure pipeline - OldHeadPipeline<Clock, Distributor<Long>> pipeline = new OldHeadPipeline<Clock, Distributor<Long>>(); - pipeline.setFirstStage(clock); - pipeline.setLastStage(distributor); - return pipeline; + return clock; } - private static class StageFactory<T extends Stage> { + private static class StageFactory<T extends AbstractStage> { private final Constructor<T> constructor; private final List<T> stages = new ArrayList<T>(); @@ -152,10 +144,7 @@ public class TcpTraceReductionAnalysisWithThreads { } } - private OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline( - final OldHeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, - final OldHeadPipeline<Clock, Distributor<Long>> clockStage, - final OldHeadPipeline<Clock, Distributor<Long>> clock2Stage) { + private IStage buildPipeline(final IStage tcpReaderPipeline, final IStage clockStage, final IStage clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -187,11 +176,7 @@ public class TcpTraceReductionAnalysisWithThreads { SpScPipe.connect(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); - // create and configure pipeline - OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new OldHeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); - pipeline.setFirstStage(relay); - pipeline.setLastStage(endStage); - return pipeline; + return relay; } public void start() {