diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java index 7079f8edba4ca921f245e70f129086093dcb9c88..234a1c598e14e83a5af80213ccbf26a46cdc0f86 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java @@ -10,12 +10,16 @@ public class Analysis { private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); - private Configuration configuration; + private final Configuration configuration; private final List<Thread> consumerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); + public Analysis(final Configuration configuration) { + this.configuration = configuration; + } + public void init() { for (StageWithPort stage : this.configuration.getConsumerStages()) { Thread thread = new Thread(new RunnableStage(stage)); @@ -76,8 +80,4 @@ public class Analysis { public Configuration getConfiguration() { return this.configuration; } - - public void setConfiguration(final Configuration configuration) { - this.configuration = configuration; - } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index f8456a931803d15d7140f5d331f0856e4ad89032..574f822a2c71916d82ace1dce3ef0a6611ef4810 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -1,24 +1,18 @@ package teetime.variant.methodcallWithPorts.stage; -import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; -import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; -public class Relay<T> extends AbstractStage { +public class Relay<T> extends ProducerStage<T> { private final InputPort<T> inputPort = this.createInputPort(); - private final OutputPort<T> outputPort = this.createOutputPort(); private SpScPipe<T> cachedCastedInputPipe; - public Relay() { - this.setReschedulable(true); - } - @Override - public void executeWithPorts() { + public void execute() { T element = this.inputPort.receive(); if (null == element) { // if (this.getInputPort().getPipe().isClosed()) { @@ -48,8 +42,4 @@ public class Relay<T> extends AbstractStage { public InputPort<T> getInputPort() { return this.inputPort; } - - public OutputPort<T> getOutputPort() { - return this.outputPort; - } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java index c1c78def138fc3406376c32d94bc9bf22d7f8856..a1f21503364416c931028dcb0657ec8b06f41043 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java @@ -24,6 +24,7 @@ import org.junit.Before; import org.junit.Test; import teetime.util.StopWatch; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; import kieker.common.record.IMonitoringRecord; import kieker.common.record.controlflow.OperationExecutionRecord; @@ -51,7 +52,10 @@ public class RecordReaderAnalysisTest { @Test public void performAnalysis() { - final RecordReaderAnalysis analysis = new RecordReaderAnalysis(); + final RecordReaderConfiguration configuration = new RecordReaderConfiguration(); + configuration.buildConfiguration(); + + Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -61,19 +65,19 @@ public class RecordReaderAnalysisTest { this.stopWatch.end(); } - assertEquals(6541, analysis.getElementCollection().size()); + assertEquals(6541, configuration.getElementCollection().size()); - KiekerMetadataRecord metadataRecord = (KiekerMetadataRecord) analysis.getElementCollection().get(0); + KiekerMetadataRecord metadataRecord = (KiekerMetadataRecord) configuration.getElementCollection().get(0); assertEquals("1.9-SNAPSHOT", metadataRecord.getVersion()); assertEquals("NANOSECONDS", metadataRecord.getTimeUnit()); - IMonitoringRecord monitoringRecord = analysis.getElementCollection().get(1); + IMonitoringRecord monitoringRecord = configuration.getElementCollection().get(1); OperationExecutionRecord oer = (OperationExecutionRecord) monitoringRecord; assertEquals("bookstoreTracing.Catalog.getBook(boolean)", oer.getOperationSignature()); assertEquals(1283156498771185344l, oer.getTin()); assertEquals(1283156498773323582l, oer.getTout()); - monitoringRecord = analysis.getElementCollection().get(analysis.getElementCollection().size() - 1); + monitoringRecord = configuration.getElementCollection().get(configuration.getElementCollection().size() - 1); oer = (OperationExecutionRecord) monitoringRecord; assertEquals("bookstoreTracing.Bookstore.searchBook()", oer.getOperationSignature()); assertEquals(1283156499331233504l, oer.getTin()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java similarity index 78% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java index 8da4ca05f7ce9d7a2480228bc2c7835c6fea31f7..f1adf9271d72ccfe53323e8bc6bdb65adb43c612 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java @@ -19,7 +19,6 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; @@ -36,42 +35,28 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class RecordReaderAnalysis extends Analysis { +public class RecordReaderConfiguration extends Configuration { private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); - private ClassNameRegistryRepository classNameRegistryRepository; - - @Override - public void init() { - Configuration configuration = this.buildConfiguration(); - this.setConfiguration(configuration); - - super.init(); - } - - private Configuration buildConfiguration() { - Configuration localConfiguration = new Configuration(); - + public void buildConfiguration() { StageWithPort producerPipeline = this.buildProducerPipeline(); - localConfiguration.getFiniteProducerStages().add(producerPipeline); - - return localConfiguration; + this.getFiniteProducerStages().add(producerPipeline); } private StageWithPort buildProducerPipeline() { - this.classNameRegistryRepository = new ClassNameRegistryRepository(); + ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages - Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository); + Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.setLastStage(collector); - dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs")); return pipeline; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java index 640f05e2a92024605d6b35a03c1756e7b62d1f2b..77025f2291d6bfaa92b54dc95af7c7f1e5aa82d9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -35,6 +35,7 @@ import org.junit.runners.MethodSorters; import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import kieker.common.record.IMonitoringRecord; @@ -80,8 +81,11 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { } void performAnalysis(final int numWorkerThreads) { - final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); - analysis.setNumWorkerThreads(numWorkerThreads); + final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(); + configuration.setNumWorkerThreads(numWorkerThreads); + configuration.buildConfiguration(); + + Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -92,7 +96,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { } int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) { + for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); @@ -106,7 +110,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); - List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs()); + List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs()); Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs); System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit"); @@ -120,10 +124,10 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); - assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); + assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords()); + assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces()); - for (Integer count : analysis.getNumTraceMetadatas()) { + for (Integer count : configuration.getNumTraceMetadatas()) { assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 55b38855afd878f0c5fd6a99e8bced42f7c23a45..dd066a91985e8a22d8a63016550b95636831df8f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -30,6 +30,7 @@ import org.junit.runners.MethodSorters; import teetime.util.ListUtil; import teetime.util.StatisticsUtil; import teetime.util.StopWatch; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import kieker.common.record.IMonitoringRecord; @@ -88,8 +89,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // Duration: 22373 ms void performAnalysis(final int numWorkerThreads) { - final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); - analysis.setNumWorkerThreads(numWorkerThreads); + final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(); + configuration.setNumWorkerThreads(numWorkerThreads); + configuration.buildConfiguration(); + + Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -100,7 +104,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { } int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) { + for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); @@ -114,7 +118,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays()); // System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace"); - List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs()); + List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(configuration.getTraceThroughputs()); Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs); System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit"); @@ -124,13 +128,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - assertEquals("#records", 21000001, analysis.getNumRecords()); + assertEquals("#records", 21000001, configuration.getNumRecords()); - for (Integer count : analysis.getNumTraceMetadatas()) { + for (Integer count : configuration.getNumTraceMetadatas()) { assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution } - assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces()); + assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces()); } public static void main(final String[] args) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java similarity index 94% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java rename to src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index d1986fdb32042ddb8071fe119002ca58c1ce41eb..9d17509d5097e97ea7bb6c1c92cecef8e05528f9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -8,7 +8,6 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; -import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; @@ -31,7 +30,7 @@ import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; import kieker.common.record.flow.trace.TraceMetadata; -public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { +public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Configuration { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int MIO = 1000000; @@ -54,7 +53,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); @SuppressWarnings({ "rawtypes", "unchecked" }) - public TcpTraceReconstructionAnalysisWithThreads() { + public TcpTraceReconstructionAnalysisWithThreadsConfiguration() { super(); try { @@ -72,33 +71,21 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } } - @Override - public void init() { - Configuration configuration = this.buildConfiguration(); - this.setConfiguration(configuration); - - super.init(); - } - - private Configuration buildConfiguration() { - Configuration localConfiguration = new Configuration(); - + public void buildConfiguration() { final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - localConfiguration.getFiniteProducerStages().add(tcpPipeline); + this.getFiniteProducerStages().add(tcpPipeline); final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); - localConfiguration.getInfiniteProducerStages().add(clockStage); + this.getInfiniteProducerStages().add(clockStage); final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); - localConfiguration.getInfiniteProducerStages().add(clock2Stage); + this.getInfiniteProducerStages().add(clock2Stage); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); for (int i = 0; i < this.numWorkerThreads; i++) { StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); - localConfiguration.getConsumerStages().add(pipeline); + this.getConsumerStages().add(pipeline); } - - return localConfiguration; } private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {