From 01fd0c1a96cb36a1882ab3bcf5696d09b8a0eaac Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Thu, 17 Sep 2015 14:06:18 +0200 Subject: [PATCH] adopted api changes (Merged AbstractStage) --- src/main/java/teetime/framework/Pipeline.java | 8 ++++---- .../stage/io/filesystem/Dir2RecordsFilter.java | 12 ++++++------ .../format/binary/DirWithBin2RecordFilter.java | 8 ++++---- .../format/text/file/DatFile2RecordFilter.java | 8 ++++---- .../recordReader/RecordReaderConfiguration.java | 10 +++++----- .../TcpTraceLoggingExtAnalysisConfiguration.java | 10 +++++----- .../TcpTraceReconstructionConf.java | 12 ++++++------ .../TraceReconstructionConf.java | 12 ++++++------ ...onstructionAnalysisWithThreadsConfiguration.java | 13 ++++++------- ...ceReductionAnalysisWithThreadsConfiguration.java | 13 ++++++------- src/test/java/dummy | 0 .../stage/io/filesystem/Dir2RecordsFilterTest.java | 8 ++++---- 12 files changed, 56 insertions(+), 58 deletions(-) delete mode 100644 src/test/java/dummy diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java index f818703d..efe02121 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/Pipeline.java @@ -26,17 +26,17 @@ import java.util.List; * the type of the last stage in this pipeline */ // TODO Consider to move it in the framework -public final class Pipeline<L extends Stage> extends AbstractCompositeStage { +public final class Pipeline<L extends AbstractStage> extends AbstractCompositeStage { - private final Stage firstStage; + private final AbstractStage firstStage; private final List<L> lastStages = new LinkedList<L>(); - public Pipeline(final Stage firstStage, final L lastStage) { + public Pipeline(final AbstractStage firstStage, final L lastStage) { this.firstStage = firstStage; this.lastStages.add(lastStage); } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return firstStage; } diff --git a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java index 66189863..c22fa6ac 100644 --- a/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/Dir2RecordsFilter.java @@ -17,10 +17,14 @@ package teetime.stage.io.filesystem; import java.io.File; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.BinaryCompressionMethod; +import kieker.common.util.filesystem.FSUtil; + import teetime.framework.AbstractCompositeStage; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.FileExtensionSwitch; import teetime.stage.basic.merger.Merger; import teetime.stage.className.ClassNameRegistryCreationFilter; @@ -29,10 +33,6 @@ import teetime.stage.io.Directory2FilesFilter; import teetime.stage.io.filesystem.format.binary.file.BinaryFile2RecordFilter; import teetime.stage.io.filesystem.format.text.file.DatFile2RecordFilter; -import kieker.common.record.IMonitoringRecord; -import kieker.common.util.filesystem.BinaryCompressionMethod; -import kieker.common.util.filesystem.FSUtil; - /** * @author Christian Wulf * @@ -85,7 +85,7 @@ public final class Dir2RecordsFilter extends AbstractCompositeStage { this.classNameRegistryCreationFilter = classNameRegistryCreationFilter; } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return classNameRegistryCreationFilter; } diff --git a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java index bed16b27..9a405c58 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/binary/DirWithBin2RecordFilter.java @@ -17,16 +17,16 @@ package teetime.stage.io.filesystem.format.binary; import java.io.File; +import kieker.common.record.IMonitoringRecord; + import teetime.framework.AbstractCompositeStage; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.className.ClassNameRegistryCreationFilter; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.filesystem.format.binary.file.BinaryFile2RecordFilter; -import kieker.common.record.IMonitoringRecord; - public class DirWithBin2RecordFilter extends AbstractCompositeStage { private final ClassNameRegistryCreationFilter classNameRegistryCreationFilter; @@ -53,7 +53,7 @@ public class DirWithBin2RecordFilter extends AbstractCompositeStage { this.classNameRegistryRepository = classNameRegistryRepository; } - protected Stage getFirstStage() { + protected AbstractStage getFirstStage() { return classNameRegistryCreationFilter; } diff --git a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java index d2ae9fe4..a5e1f8d4 100644 --- a/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java +++ b/src/main/java/teetime/stage/io/filesystem/format/text/file/DatFile2RecordFilter.java @@ -17,15 +17,15 @@ package teetime.stage.io.filesystem.format.text.file; import java.io.File; +import kieker.common.record.IMonitoringRecord; + import teetime.framework.AbstractCompositeStage; +import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.File2TextLinesFilter; -import kieker.common.record.IMonitoringRecord; - /** * @author Christian Wulf * @@ -43,7 +43,7 @@ public class DatFile2RecordFilter extends AbstractCompositeStage { connectPorts(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort()); } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return file2TextLinesFilter; } diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java index f50b5220..6c985058 100644 --- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java +++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java @@ -19,15 +19,15 @@ import java.io.File; import java.util.LinkedList; import java.util.List; +import kieker.common.record.IMonitoringRecord; + +import teetime.framework.AbstractStage; import teetime.framework.Configuration; -import teetime.framework.Stage; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.filesystem.Dir2RecordsFilter; -import kieker.common.record.IMonitoringRecord; - /** * @author Christian Wulf * @@ -42,11 +42,11 @@ public class RecordReaderConfiguration extends Configuration { } private void buildConfiguration() { - final Stage producerPipeline = this.buildProducerPipeline(); + final AbstractStage producerPipeline = this.buildProducerPipeline(); declareActive(producerPipeline); } - private Stage buildProducerPipeline() { + private AbstractStage buildProducerPipeline() { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); File logDir = new File("src/test/data/bookstore-logs"); // create stages diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java index 1e145c27..d624114a 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java @@ -17,9 +17,11 @@ package teetime.examples.traceReading; import java.util.List; +import kieker.common.record.IMonitoringRecord; + +import teetime.framework.AbstractStage; import teetime.framework.Configuration; import teetime.framework.Pipeline; -import teetime.framework.Stage; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; @@ -27,8 +29,6 @@ import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; import teetime.stage.io.network.TcpReaderStage; -import kieker.common.record.IMonitoringRecord; - public class TcpTraceLoggingExtAnalysisConfiguration extends Configuration { private Counter<IMonitoringRecord> recordCounter; @@ -41,7 +41,7 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends Configuration { private void init() { final Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); declareActive(clockPipeline.getFirstStage()); - final Stage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); + final AbstractStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); declareActive(tcpPipeline); } @@ -56,7 +56,7 @@ public class TcpTraceLoggingExtAnalysisConfiguration extends Configuration { return new Pipeline<Distributor<Long>>(clockStage, distributor); } - private Stage buildTcpPipeline(final Distributor<Long> previousClockStage) { + private AbstractStage buildTcpPipeline(final Distributor<Long> previousClockStage) { TcpReaderStage tcpReader = new TcpReaderStage(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java index c05eb5a6..c84dd0a1 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java @@ -17,9 +17,12 @@ package teetime.examples.traceReconstruction; import java.util.List; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; + +import teetime.framework.AbstractStage; import teetime.framework.Configuration; import teetime.framework.Pipeline; -import teetime.framework.Stage; import teetime.stage.Clock; import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; @@ -32,9 +35,6 @@ import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.ConcurrentHashMapWithDefault; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; - public class TcpTraceReconstructionConf extends Configuration { private static final int MIO = 1000000; @@ -59,7 +59,7 @@ public class TcpTraceReconstructionConf extends Configuration { Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); declareActive(clock2Stage.getFirstStage()); - Stage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); + AbstractStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); declareActive(pipeline); } @@ -73,7 +73,7 @@ public class TcpTraceReconstructionConf extends Configuration { return new Pipeline<Distributor<Long>>(clock, distributor); } - private Stage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private AbstractStage buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages TcpReaderStage tcpReader = new TcpReaderStage(); this.recordCounter = new Counter<IMonitoringRecord>(); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java index 0afcb050..717ff484 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java @@ -19,8 +19,11 @@ import java.io.File; import java.util.LinkedList; import java.util.List; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; + +import teetime.framework.AbstractStage; import teetime.framework.Configuration; -import teetime.framework.Stage; import teetime.stage.Cache; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -39,9 +42,6 @@ import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.ConcurrentHashMapWithDefault; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; - public class TraceReconstructionConf extends Configuration { private final List<EventBasedTrace> elementCollection = new LinkedList<EventBasedTrace>(); @@ -64,7 +64,7 @@ public class TraceReconstructionConf extends Configuration { Clock clockStage = this.buildClockPipeline(); declareActive(clockStage); - Stage pipeline = this.buildPipeline(clockStage); + AbstractStage pipeline = this.buildPipeline(clockStage); declareActive(pipeline); } @@ -75,7 +75,7 @@ public class TraceReconstructionConf extends Configuration { return clock; } - private Stage buildPipeline(final Clock clockStage) { + private AbstractStage buildPipeline(final Clock clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 6484124b..47a284c7 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -21,10 +21,13 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.TraceMetadata; + import teetime.framework.AbstractStage; import teetime.framework.Configuration; import teetime.framework.Pipeline; -import teetime.framework.Stage; import teetime.framework.pipe.IMonitorablePipe; import teetime.stage.Clock; import teetime.stage.Counter; @@ -41,10 +44,6 @@ import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.ConcurrentHashMapWithDefault; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.TraceMetadata; - public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Configuration { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); @@ -101,7 +100,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf declareActive(clock2Stage.getFirstStage()); for (int i = 0; i < this.numWorkerThreads; i++) { - Stage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); + AbstractStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); declareActive(pipeline); } } @@ -156,7 +155,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf } } - private Stage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, + private AbstractStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java index ebea3fcd..60672438 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreadsConfiguration.java @@ -23,10 +23,13 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.TraceMetadata; + import teetime.framework.AbstractStage; import teetime.framework.Configuration; import teetime.framework.Pipeline; -import teetime.framework.Stage; import teetime.framework.pipe.IMonitorablePipe; import teetime.stage.Clock; import teetime.stage.Counter; @@ -46,10 +49,6 @@ import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import teetime.stage.trace.traceReduction.TraceReductionFilter; import teetime.util.ConcurrentHashMapWithDefault; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.IFlowRecord; -import kieker.common.record.flow.trace.TraceMetadata; - public class TcpTraceReductionAnalysisWithThreadsConfiguration extends Configuration { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); @@ -97,7 +96,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends Configura declareActive(clock2Stage.getFirstStage()); for (int i = 0; i < this.numWorkerThreads; i++) { - final Stage pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + final AbstractStage pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); declareActive(pipeline); } } @@ -158,7 +157,7 @@ public class TcpTraceReductionAnalysisWithThreadsConfiguration extends Configura private final StageFactory<Counter<TraceAggregationBuffer>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceAggregationBuffer>> traceThroughputFilterFactory; - private Stage buildPipeline(final Pipeline<Distributor<IMonitoringRecord>> tcpPipeline, final Pipeline<Distributor<Long>> clockStage, + private AbstractStage buildPipeline(final Pipeline<Distributor<IMonitoringRecord>> tcpPipeline, final Pipeline<Distributor<Long>> clockStage, final Pipeline<Distributor<Long>> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); diff --git a/src/test/java/dummy b/src/test/java/dummy deleted file mode 100644 index e69de29b..00000000 diff --git a/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java b/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java index dc3f1d32..486bba06 100644 --- a/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java +++ b/src/test/java/teetime/stage/io/filesystem/Dir2RecordsFilterTest.java @@ -19,17 +19,17 @@ import java.io.File; import org.junit.Test; +import kieker.common.record.IMonitoringRecord; + import teetime.framework.AbstractCompositeStage; +import teetime.framework.AbstractStage; import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.OutputPort; -import teetime.framework.Stage; import teetime.stage.InitialElementProducer; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.Printer; -import kieker.common.record.IMonitoringRecord; - public class Dir2RecordsFilterTest { class TestConfiguration extends Configuration { @@ -59,7 +59,7 @@ public class Dir2RecordsFilterTest { return this.reader.getOutputPort(); } - public Stage getFirstStage() { + public AbstractStage getFirstStage() { return this.producer; } -- GitLab