From 81ce40cf415de5e892d22c77d96f6044d9eda640 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 26 Dec 2014 07:44:41 +0100 Subject: [PATCH] fixed compiler errors --- src/main/java/teetime/framework/Pipeline.java | 10 +++ src/main/java/util/KiekerLoadDriver.java | 6 +- .../TcpTraceLoggingConfiguration.java | 2 +- .../kiekerdays/TcpTraceLoggingExplorviz.java | 23 ++++-- .../kiekerdays/TcpTraceReconstruction.java | 72 ++++++----------- .../kiekerdays/TcpTraceReduction.java | 78 ++++++------------- ...omeTcpTraceReconstructionAnalysisTest.java | 2 +- ...hwHomeTraceReconstructionAnalysisTest.java | 52 ++++++++----- ...orkTcpTraceReconstructionAnalysisTest.java | 2 +- ...hwWorkTraceReconstructionAnalysisTest.java | 56 +++++++------ ...n.java => TcpTraceReconstructionConf.java} | 4 +- ...ysis.java => TraceReconstructionConf.java} | 32 +------- 12 files changed, 150 insertions(+), 189 deletions(-) rename src/performancetest/java/teetime/examples/traceReconstruction/{TcpTraceReconstructionAnalysisConfiguration.java => TcpTraceReconstructionConf.java} (97%) rename src/performancetest/java/teetime/examples/traceReconstruction/{TraceReconstructionAnalysis.java => TraceReconstructionConf.java} (88%) diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java index 27d11375..c56cefc7 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/Pipeline.java @@ -58,4 +58,14 @@ public final class Pipeline<L extends Stage> extends Stage { return lastStage; } + @Override + protected InputPort<?>[] getInputPorts() { + return firstStage.getInputPorts(); + } + + @Override + protected boolean isStarted() { + return firstStage.isStarted(); + } + } diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java index eb8146dd..d9a6b789 100644 --- a/src/main/java/util/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -14,7 +14,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; @@ -35,14 +35,14 @@ public class KiekerLoadDriver { private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); private final IPipeFactory intraThreadPipeFactory; - private final RunnableStage runnableStage; + private final RunnableProducerStage runnableStage; private long[] timings; public KiekerLoadDriver(final File directory) { intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); Stage producerPipeline = this.buildProducerPipeline(directory); - runnableStage = new RunnableStage(producerPipeline); + runnableStage = new RunnableProducerStage(producerPipeline); } private Stage buildProducerPipeline(final File directory) { diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingConfiguration.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingConfiguration.java index 52a051ce..ce61d016 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingConfiguration.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingConfiguration.java @@ -8,7 +8,7 @@ import teetime.framework.Stage; import teetime.stage.io.network.TcpReader; import teetime.util.Pair; -public class TcpTraceLoggingConfiguration extends AnalysisConfiguration { +class TcpTraceLoggingConfiguration extends AnalysisConfiguration { public TcpTraceLoggingConfiguration() { Stage tcpPipeline = this.buildTcpPipeline(); diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java index eec9b2aa..339fca62 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -1,24 +1,33 @@ package teetime.examples.kiekerdays; +import teetime.framework.AnalysisConfiguration; +import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; -import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.basic.Sink; import teetime.stage.explorviz.KiekerRecordTcpReader; import kieker.common.record.IMonitoringRecord; -public class TcpTraceLoggingExplorviz { +class TcpTraceLoggingExplorviz extends AnalysisConfiguration { private Thread tcpThread; + private final IPipeFactory intraThreadPipeFactory; - public void init() { + public TcpTraceLoggingExplorviz() { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + // interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); + } + + private void init() { Stage tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + this.tcpThread = new Thread(new RunnableProducerStage(tcpPipeline)); } public void start() { - this.tcpThread.start(); try { @@ -32,7 +41,7 @@ public class TcpTraceLoggingExplorviz { KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader(); Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); - SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); + intraThreadPipeFactory.create(tcpReader.getOutputPort(), endStage.getInputPort()); return tcpReader; } diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index 3357b1ad..2cb2765d 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -1,15 +1,16 @@ package teetime.examples.kiekerdays; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.Analysis; +import teetime.framework.AnalysisConfiguration; import teetime.framework.Pipeline; -import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; @@ -19,6 +20,7 @@ import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; +import teetime.util.Pair; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -26,7 +28,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; -public class TcpTraceReconstruction { +class TcpTraceReconstruction extends AnalysisConfiguration { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int MIO = 1000000; @@ -39,26 +41,22 @@ public class TcpTraceReconstruction { private final IPipeFactory intraThreadPipeFactory; private final IPipeFactory interThreadPipeFactory; - private Thread tcpThread; - private Thread[] workerThreads; + private final int numWorkerThreads; - private int numWorkerThreads; - - public TcpTraceReconstruction() { - intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + public TcpTraceReconstruction(final int numWorkerThreads) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, numWorkerThreads); + init(); } - public void init() { + private void init() { Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - - this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; + addThreadableStage(tcpPipeline); - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < this.numWorkerThreads; i++) { Stage pipeline = this.buildPipeline(tcpPipeline.getLastStage()); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + addThreadableStage(pipeline); } } @@ -90,26 +88,7 @@ public class TcpTraceReconstruction { return relay; } - public void start() { - - this.tcpThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.tcpThread.join(); - - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - - public void onTerminate() { + public void printNumWaits() { int maxNumWaits = 0; for (IPipe pipe : this.tcpRelayPipes) { SpScPipe interThreadPipe = (SpScPipe) pipe; @@ -126,22 +105,17 @@ public class TcpTraceReconstruction { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - public static void main(final String[] args) { int numWorkerThreads = Integer.valueOf(args[0]); - final TcpTraceReconstruction analysis = new TcpTraceReconstruction(); - analysis.setNumWorkerThreads(numWorkerThreads); + final TcpTraceReconstruction configuration = new TcpTraceReconstruction(numWorkerThreads); + Analysis analysis = new Analysis(configuration); analysis.init(); - try { - analysis.start(); - } finally { - analysis.onTerminate(); - } + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + + System.out.println("Exceptions: " + exceptions.size()); + configuration.printNumWaits(); } } diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java index e248282a..efc0c4bc 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java @@ -1,17 +1,18 @@ package teetime.examples.kiekerdays; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import teetime.framework.Stage; +import teetime.framework.Analysis; +import teetime.framework.AnalysisConfiguration; import teetime.framework.Pipeline; -import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; @@ -25,6 +26,7 @@ import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import teetime.stage.trace.traceReduction.TraceComperator; import teetime.stage.trace.traceReduction.TraceReductionFilter; +import teetime.util.Pair; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -32,7 +34,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; -public class TcpTraceReduction { +class TcpTraceReduction extends AnalysisConfiguration { private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int MIO = 1000000; @@ -43,33 +45,28 @@ public class TcpTraceReduction { private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>(); - private Thread tcpThread; - private Thread clockThread; - private Thread[] workerThreads; - - private int numWorkerThreads; + private final int numWorkerThreads; private final IPipeFactory intraThreadPipeFactory; private final IPipeFactory interThreadPipeFactory; - public TcpTraceReduction() { - intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + public TcpTraceReduction(final int numWorkerThreads) { + this.numWorkerThreads = Math.min(numWorkerThreads, NUM_VIRTUAL_CORES); + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); } - public void init() { + private void init() { Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); - this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); + addThreadableStage(tcpPipeline); Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(5000); - this.clockThread = new Thread(new RunnableStage(clockStage)); + addThreadableStage(clockStage); - this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; - - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < this.numWorkerThreads; i++) { Stage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage()); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + addThreadableStage(pipeline); } } @@ -116,28 +113,7 @@ public class TcpTraceReduction { return relay; } - public void start() { - - this.tcpThread.start(); - this.clockThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.tcpThread.join(); - - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - this.clockThread.interrupt(); - } - - public void onTerminate() { + public void printNumWaits() { int maxNumWaits = 0; for (IPipe pipe : this.tcpRelayPipes) { SpScPipe interThreadPipe = (SpScPipe) pipe; @@ -155,22 +131,18 @@ public class TcpTraceReduction { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - public static void main(final String[] args) { int numWorkerThreads = Integer.valueOf(args[0]); - final TcpTraceReduction analysis = new TcpTraceReduction(); - analysis.setNumWorkerThreads(numWorkerThreads); + final TcpTraceReduction configuration = new TcpTraceReduction(numWorkerThreads); + Analysis analysis = new Analysis(configuration); analysis.init(); - try { - analysis.start(); - } finally { - analysis.onTerminate(); - } + + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + + System.out.println("Exceptions: " + exceptions.size()); + configuration.printNumWaits(); } } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java index 825c2501..811d030f 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTcpTraceReconstructionAnalysisTest.java @@ -61,7 +61,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisTest { @Test public void performAnalysis() { - final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration(); + final TcpTraceReconstructionConf configuration = new TcpTraceReconstructionConf(); final Analysis analysis = new Analysis(configuration); analysis.init(); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 6a9d9a73..bf37a514 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -31,6 +31,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import teetime.framework.Analysis; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -59,7 +60,10 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithEprintsLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/Eprints-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File(RESOURCE_DIR + "data/Eprints-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -68,17 +72,17 @@ public class ChwHomeTraceReconstructionAnalysisTest { this.stopWatch.end(); } - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); - assertEquals(50002, analysis.getNumRecords()); - assertEquals(2, analysis.getNumTraces()); + assertEquals(50002, configuration.getNumRecords()); + assertEquals(2, configuration.getNumTraces()); - TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + TraceEventRecords trace6884 = configuration.getElementCollection().get(0); assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); - TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + TraceEventRecords trace6886 = configuration.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); assertThat(quintiles.get(0.5), is(both(greaterThan(0l)).and(lessThan(2l)))); @@ -86,7 +90,10 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKiekerLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File(RESOURCE_DIR + "data/kieker-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -95,17 +102,17 @@ public class ChwHomeTraceReconstructionAnalysisTest { this.stopWatch.end(); } - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); - assertEquals(1489902, analysis.getNumRecords()); - assertEquals(24013, analysis.getNumTraces()); + assertEquals(1489902, configuration.getNumRecords()); + assertEquals(24013, configuration.getNumTraces()); - TraceEventRecords trace0 = analysis.getElementCollection().get(0); + TraceEventRecords trace0 = configuration.getElementCollection().get(0); assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId()); - TraceEventRecords trace1 = analysis.getElementCollection().get(1); + TraceEventRecords trace1 = configuration.getElementCollection().get(1); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); assertThat(quintiles.get(0.5), is(both(greaterThan(2100l)).and(lessThan(2200l)))); @@ -113,7 +120,10 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKieker2Logs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker2-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File(RESOURCE_DIR + "data/kieker2-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -122,19 +132,19 @@ public class ChwHomeTraceReconstructionAnalysisTest { this.stopWatch.end(); } - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - assertTrue(analysis.getThroughputs().isEmpty()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + assertTrue(configuration.getThroughputs().isEmpty()); // Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); // System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); - assertEquals(17371, analysis.getNumRecords()); - assertEquals(22, analysis.getNumTraces()); + assertEquals(17371, configuration.getNumRecords()); + assertEquals(22, configuration.getNumTraces()); - TraceEventRecords trace0 = analysis.getElementCollection().get(0); + TraceEventRecords trace0 = configuration.getElementCollection().get(0); assertEquals(0, trace0.getTraceMetadata().getTraceId()); - TraceEventRecords trace1 = analysis.getElementCollection().get(1); + TraceEventRecords trace1 = configuration.getElementCollection().get(1); assertEquals(1, trace1.getTraceMetadata().getTraceId()); // assertThat(quintiles.get(0.5), is(both(greaterThan(200l)).and(lessThan(250l)))); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java index 7c7cc3b0..e2d6fca3 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -54,7 +54,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest { @Test public void performAnalysis() { - final TcpTraceReconstructionAnalysisConfiguration configuration = new TcpTraceReconstructionAnalysisConfiguration(); + final TcpTraceReconstructionConf configuration = new TcpTraceReconstructionConf(); Analysis analysis = new Analysis(configuration); analysis.init(); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 9fb77d98..4ef91f30 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -30,6 +30,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import teetime.framework.Analysis; import teetime.util.StopWatch; import util.test.StatisticsUtil; @@ -38,9 +39,9 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ -public class ChwWorkTraceReconstructionAnalysisTest { +class ChwWorkTraceReconstructionAnalysisTest { private StopWatch stopWatch; @@ -57,7 +58,10 @@ public class ChwWorkTraceReconstructionAnalysisTest { @Test public void performAnalysisWithEprintsLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/Eprints-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File("src/test/data/Eprints-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -66,23 +70,26 @@ public class ChwWorkTraceReconstructionAnalysisTest { this.stopWatch.end(); } - assertEquals(50002, analysis.getNumRecords()); - assertEquals(2, analysis.getNumTraces()); + assertEquals(50002, configuration.getNumRecords()); + assertEquals(2, configuration.getNumTraces()); - TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + TraceEventRecords trace6884 = configuration.getElementCollection().get(0); assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); - TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + TraceEventRecords trace6886 = configuration.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); } @Test public void performAnalysisWithKiekerLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File("src/test/data/kieker-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -91,17 +98,17 @@ public class ChwWorkTraceReconstructionAnalysisTest { this.stopWatch.end(); } - assertEquals(1489902, analysis.getNumRecords()); - assertEquals(24013, analysis.getNumTraces()); + assertEquals(1489902, configuration.getNumRecords()); + assertEquals(24013, configuration.getNumTraces()); - TraceEventRecords trace0 = analysis.getElementCollection().get(0); + TraceEventRecords trace0 = configuration.getElementCollection().get(0); assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId()); - TraceEventRecords trace1 = analysis.getElementCollection().get(1); + TraceEventRecords trace1 = configuration.getElementCollection().get(1); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); @@ -109,7 +116,10 @@ public class ChwWorkTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKieker2Logs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker2-logs")); + final TraceReconstructionConf configuration = new TraceReconstructionConf(new File("src/test/data/kieker2-logs")); + + Analysis analysis = new Analysis(configuration); + analysis.init(); this.stopWatch.start(); try { @@ -118,17 +128,17 @@ public class ChwWorkTraceReconstructionAnalysisTest { this.stopWatch.end(); } - assertEquals(17371, analysis.getNumRecords()); - assertEquals(22, analysis.getNumTraces()); + assertEquals(17371, configuration.getNumRecords()); + assertEquals(22, configuration.getNumTraces()); - TraceEventRecords trace0 = analysis.getElementCollection().get(0); + TraceEventRecords trace0 = configuration.getElementCollection().get(0); assertEquals(0, trace0.getTraceMetadata().getTraceId()); - TraceEventRecords trace1 = analysis.getElementCollection().get(1); + TraceEventRecords trace1 = configuration.getElementCollection().get(1); assertEquals(1, trace1.getTraceMetadata().getTraceId()); - StatisticsUtil.removeLeadingZeroThroughputs(analysis.getThroughputs()); - Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + StatisticsUtil.removeLeadingZeroThroughputs(configuration.getThroughputs()); + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(configuration.getThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); } diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java similarity index 97% rename from src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java rename to src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java index be1a78b4..2f8ddd21 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionConf.java @@ -24,7 +24,7 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; -public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfiguration { +public class TcpTraceReconstructionConf extends AnalysisConfiguration { private static final int MIO = 1000000; private static final int TCP_RELAY_MAX_SIZE = 2 * MIO; @@ -41,7 +41,7 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu private final IPipeFactory intraThreadPipeFactory; private final IPipeFactory interThreadPipeFactory; - public TcpTraceReconstructionAnalysisConfiguration() { + public TcpTraceReconstructionConf() { intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); init(); diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java similarity index 88% rename from src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java rename to src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java index 3341f172..28adcc9e 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionConf.java @@ -5,7 +5,6 @@ import java.util.LinkedList; import java.util.List; import teetime.framework.AnalysisConfiguration; -import teetime.framework.RunnableStage; import teetime.framework.Stage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -31,14 +30,10 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; -// TODO extends AnalysisConfiguration -public class TraceReconstructionAnalysis extends AnalysisConfiguration { +public class TraceReconstructionConf extends AnalysisConfiguration { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); - private Thread clockThread; - private Thread workerThread; - private final File inputDir; private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace; private final IPipeFactory intraThreadPipeFactory; @@ -49,7 +44,7 @@ public class TraceReconstructionAnalysis extends AnalysisConfiguration { private Counter<TraceEventRecords> traceCounter; private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; - public TraceReconstructionAnalysis(final File inputDir) { + public TraceReconstructionConf(final File inputDir) { this.inputDir = inputDir; traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); @@ -59,10 +54,10 @@ public class TraceReconstructionAnalysis extends AnalysisConfiguration { private void init() { Clock clockStage = this.buildClockPipeline(); - this.clockThread = new Thread(new RunnableStage(clockStage)); + addThreadableStage(clockStage); Stage pipeline = this.buildPipeline(clockStage); - this.workerThread = new Thread(new RunnableStage(pipeline)); + addThreadableStage(pipeline); } private Clock buildClockPipeline() { @@ -113,25 +108,6 @@ public class TraceReconstructionAnalysis extends AnalysisConfiguration { return initialElementProducer; } - public void start() { - - this.clockThread.start(); - this.workerThread.start(); - - try { - this.workerThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - - this.clockThread.interrupt(); - try { - this.clockThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - public List<TraceEventRecords> getElementCollection() { return this.elementCollection; } -- GitLab