From 4fa07eb7628a1717a466d0af4f4c4f7f7b2527b0 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 11 Aug 2014 07:55:02 +0200 Subject: [PATCH] added analysis configuration concept --- conf/logback.groovy | 4 +- .../framework/core/Analysis.java | 83 ++++++++++++ .../framework/core/Configuration.java | 24 ++++ .../recordReader/RecordReaderAnalysis.java | 33 ++--- .../RecordReaderAnalysisTest.java | 3 +- ...ReconstructionAnalysisWithThreadsTest.java | 3 +- ...ReconstructionAnalysisWithThreadsTest.java | 3 +- ...raceReconstructionAnalysisWithThreads.java | 122 ++++++++---------- 8 files changed, 177 insertions(+), 98 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java diff --git a/conf/logback.groovy b/conf/logback.groovy index 9501197f..32e3afa2 100644 --- a/conf/logback.groovy +++ b/conf/logback.groovy @@ -1,7 +1,5 @@ statusListener(OnConsoleStatusListener) -root(WARN) - /*appender("FILE", FileAppender) { file = "testFile.log" append = true @@ -18,4 +16,6 @@ appender("CONSOLE", ConsoleAppender) { } } +root WARN, ["CONSOLE"] + //logger "teetime.variant.methodcallWithPorts.stage", DEBUG, ["CONSOLE"] \ No newline at end of file diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java new file mode 100644 index 00000000..7079f8ed --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java @@ -0,0 +1,83 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Analysis { + + private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); + + private Configuration configuration; + + private final List<Thread> consumerThreads = new LinkedList<Thread>(); + private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); + private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); + + public void init() { + for (StageWithPort stage : this.configuration.getConsumerStages()) { + Thread thread = new Thread(new RunnableStage(stage)); + this.consumerThreads.add(thread); + } + + for (StageWithPort stage : this.configuration.getFiniteProducerStages()) { + Thread thread = new Thread(new RunnableStage(stage)); + this.finiteProducerThreads.add(thread); + } + + for (StageWithPort stage : this.configuration.getInfiniteProducerStages()) { + Thread thread = new Thread(new RunnableStage(stage)); + this.infiniteProducerThreads.add(thread); + } + } + + public void start() { + // start analysis + for (Thread thread : this.consumerThreads) { + thread.start(); + } + + for (Thread thread : this.finiteProducerThreads) { + thread.start(); + } + + for (Thread thread : this.infiniteProducerThreads) { + thread.start(); + } + + // wait for the analysis to complete + try { + for (Thread thread : this.finiteProducerThreads) { + thread.join(); + } + + for (Thread thread : this.consumerThreads) { + thread.join(); + } + } catch (InterruptedException e) { + LOGGER.error("Analysis has stopped unexpectedly", e); + + for (Thread thread : this.finiteProducerThreads) { + thread.interrupt(); + } + + for (Thread thread : this.consumerThreads) { + thread.interrupt(); + } + } + + for (Thread thread : this.infiniteProducerThreads) { + thread.interrupt(); + } + } + + public Configuration getConfiguration() { + return this.configuration; + } + + public void setConfiguration(final Configuration configuration) { + this.configuration = configuration; + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java new file mode 100644 index 00000000..fcac06f4 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java @@ -0,0 +1,24 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +import java.util.LinkedList; +import java.util.List; + +public class Configuration { + + private final List<StageWithPort> consumerStages = new LinkedList<StageWithPort>(); + private final List<StageWithPort> finiteProducerStages = new LinkedList<StageWithPort>(); + private final List<StageWithPort> infiniteProducerStages = new LinkedList<StageWithPort>(); + + public List<StageWithPort> getConsumerStages() { + return this.consumerStages; + } + + public List<StageWithPort> getFiniteProducerStages() { + return this.finiteProducerStages; + } + + public List<StageWithPort> getInfiniteProducerStages() { + return this.infiniteProducerStages; + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java index b0b778ab..8da4ca05 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysis.java @@ -19,9 +19,9 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; -import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -33,22 +33,30 @@ import kieker.common.record.IMonitoringRecord; /** * @author Christian Wulf - * + * * @since 1.10 */ public class RecordReaderAnalysis extends Analysis { private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); - private Thread producerThread; - private ClassNameRegistryRepository classNameRegistryRepository; @Override public void init() { + Configuration configuration = this.buildConfiguration(); + this.setConfiguration(configuration); + super.init(); + } + + private Configuration buildConfiguration() { + Configuration localConfiguration = new Configuration(); + StageWithPort producerPipeline = this.buildProducerPipeline(); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); + localConfiguration.getFiniteProducerStages().add(producerPipeline); + + return localConfiguration; } private StageWithPort buildProducerPipeline() { @@ -69,19 +77,6 @@ public class RecordReaderAnalysis extends Analysis { return pipeline; } - @Override - public void start() { - super.start(); - - this.producerThread.start(); - - try { - this.producerThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - public List<IMonitoringRecord> getElementCollection() { return this.elementCollection; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java index 54dac502..c1c78def 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java @@ -31,7 +31,7 @@ import kieker.common.record.misc.KiekerMetadataRecord; /** * @author Christian Wulf - * + * * @since 1.10 */ public class RecordReaderAnalysisTest { @@ -59,7 +59,6 @@ public class RecordReaderAnalysisTest { analysis.start(); } finally { this.stopWatch.end(); - analysis.onTerminate(); } assertEquals(6541, analysis.getElementCollection().size()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java index dbb53627..640f05e2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -41,7 +41,7 @@ import kieker.common.record.IMonitoringRecord; /** * @author Christian Wulf - * + * * @since 1.10 */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) @@ -89,7 +89,6 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { analysis.start(); } finally { this.stopWatch.end(); - analysis.onTerminate(); } int maxNumWaits = 0; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 147bed62..55b38855 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -36,7 +36,7 @@ import kieker.common.record.IMonitoringRecord; /** * @author Christian Wulf - * + * * @since 1.10 */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) @@ -97,7 +97,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { analysis.start(); } finally { this.stopWatch.end(); - analysis.onTerminate(); } int maxNumWaits = 0; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index 0a718b4f..d1986fdb 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -8,9 +8,9 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; -import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; -import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -39,32 +39,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); - private Thread tcpThread; - private Thread clockThread; - private Thread clock2Thread; - private Thread[] workerThreads; - private int numWorkerThreads; + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + + private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; + private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory; + private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; + private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; + private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory; + private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; + private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; + + private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public TcpTraceReconstructionAnalysisWithThreads() { + super(); + + try { + this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); + this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); + this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); + this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); + this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class)); + this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); + this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } catch (SecurityException e) { + throw new IllegalArgumentException(e); + } + } + @Override public void init() { + Configuration configuration = this.buildConfiguration(); + this.setConfiguration(configuration); + super.init(); - Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + } - Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + private Configuration buildConfiguration() { + Configuration localConfiguration = new Configuration(); - Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); - this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + localConfiguration.getFiniteProducerStages().add(tcpPipeline); - this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; + final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + localConfiguration.getInfiniteProducerStages().add(clockStage); + + final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + localConfiguration.getInfiniteProducerStages().add(clock2Stage); - for (int i = 0; i < this.workerThreads.length; i++) { + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); + for (int i = 0; i < this.numWorkerThreads; i++) { StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + localConfiguration.getConsumerStages().add(pipeline); } + + return localConfiguration; } private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { @@ -125,35 +159,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { } } - private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - - private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory; - private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory; - private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory; - private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory; - private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory; - private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; - private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; - - private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public TcpTraceReconstructionAnalysisWithThreads() { - try { - this.recordCounterFactory = new StageFactory(Counter.class.getConstructor()); - this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor()); - this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); - this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class)); - this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class)); - this.traceCounterFactory = new StageFactory(Counter.class.getConstructor()); - this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor()); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } catch (SecurityException e) { - throw new IllegalArgumentException(e); - } - } - private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages @@ -206,31 +211,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return pipeline; } - @Override - public void start() { - super.start(); - - this.clockThread.start(); - this.clock2Thread.start(); - this.tcpThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.tcpThread.join(); - - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - this.clockThread.interrupt(); - this.clock2Thread.interrupt(); - } - public List<TraceEventRecords> getElementCollection() { return this.elementCollection; } -- GitLab