From ab6e659dea570296fc34f12bc4fe650dbb627f2c Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 5 Dec 2014 08:57:10 +0100
Subject: [PATCH] migrated TcpTraceLoggingExtAnalysis to configuration

---
 .../RecordReaderConfiguration.java            | 12 +--
 .../ChwHomeTcpTraceReadingTest.java           |  9 +-
 .../TcpTraceLoggingExtAnalysis.java           | 83 -------------------
 ...pTraceLoggingExtAnalysisConfiguration.java | 80 ++++++++++++++++++
 4 files changed, 92 insertions(+), 92 deletions(-)
 delete mode 100644 src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
 create mode 100644 src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java

diff --git a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
index 0f8000e8..2a0ad78c 100644
--- a/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
+++ b/src/performancetest/java/teetime/examples/recordReader/RecordReaderConfiguration.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.IStage;
+import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
 import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
@@ -38,14 +39,16 @@ import kieker.common.record.IMonitoringRecord;
 public class RecordReaderConfiguration extends AnalysisConfiguration {
 
 	private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
+	private final IPipeFactory intraThreadPipeFactory;
 
 	public RecordReaderConfiguration() {
+		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 		this.buildConfiguration();
 	}
 
 	private void buildConfiguration() {
 		IStage producerPipeline = this.buildProducerPipeline();
-		this.getFiniteProducerStages().add(producerPipeline);
+		addThreadableStage(producerPipeline);
 	}
 
 	private IStage buildProducerPipeline() {
@@ -57,11 +60,8 @@ public class RecordReaderConfiguration extends AnalysisConfiguration {
 		CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
 
 		// connect stages
-		PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false)
-				.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
-
-		PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false)
-				.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
+		intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
+		intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
 
 		return initialElementProducer;
 	}
diff --git a/src/performancetest/java/teetime/examples/traceReading/ChwHomeTcpTraceReadingTest.java b/src/performancetest/java/teetime/examples/traceReading/ChwHomeTcpTraceReadingTest.java
index 6b23a328..7426d4e2 100644
--- a/src/performancetest/java/teetime/examples/traceReading/ChwHomeTcpTraceReadingTest.java
+++ b/src/performancetest/java/teetime/examples/traceReading/ChwHomeTcpTraceReadingTest.java
@@ -32,6 +32,7 @@ import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
 
+import teetime.framework.Analysis;
 import teetime.util.ListUtil;
 import teetime.util.StopWatch;
 import util.test.StatisticsUtil;
@@ -63,7 +64,9 @@ public class ChwHomeTcpTraceReadingTest {
 
 	@Test
 	public void performAnalysis() {
-		final TcpTraceLoggingExtAnalysis analysis = new TcpTraceLoggingExtAnalysis();
+		final TcpTraceLoggingExtAnalysisConfiguration configuration = new TcpTraceLoggingExtAnalysisConfiguration();
+
+		final Analysis analysis = new Analysis(configuration);
 		analysis.init();
 
 		this.stopWatch.start();
@@ -73,11 +76,11 @@ public class ChwHomeTcpTraceReadingTest {
 			this.stopWatch.end();
 		}
 
-		List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
+		List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs());
 		Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
 		System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
 
-		assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
+		assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords());
 
 		// 08.07.2014 (incl.)
 		assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3500L))));
diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
deleted file mode 100644
index 6ab3f824..00000000
--- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package teetime.examples.traceReading;
-
-import java.util.List;
-
-import teetime.framework.IStage;
-import teetime.framework.RunnableStage;
-import teetime.framework.pipe.SingleElementPipe;
-import teetime.framework.pipe.SpScPipe;
-import teetime.stage.Clock;
-import teetime.stage.Counter;
-import teetime.stage.ElementThroughputMeasuringStage;
-import teetime.stage.basic.Sink;
-import teetime.stage.basic.distributor.Distributor;
-import teetime.stage.io.network.TcpReader;
-
-import kieker.common.record.IMonitoringRecord;
-
-public class TcpTraceLoggingExtAnalysis {
-
-	private Thread clockThread;
-	private Thread tcpThread;
-
-	private Counter<IMonitoringRecord> recordCounter;
-	private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
-
-	private IStage buildClockPipeline(final long intervalDelayInMs) {
-		Clock clockStage = new Clock();
-		clockStage.setInitialDelayInMs(intervalDelayInMs);
-		clockStage.setIntervalDelayInMs(intervalDelayInMs);
-		Distributor<Long> distributor = new Distributor<Long>();
-
-		SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
-
-		return clockStage;
-	}
-
-	private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) {
-		TcpReader tcpReader = new TcpReader();
-		this.recordCounter = new Counter<IMonitoringRecord>();
-		this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
-		Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
-
-		SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
-		SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
-		SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
-		// SingleElementPipe.connect(this.recordCounter.getOutputPort(), endStage.getInputPort());
-
-		SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
-
-		return tcpReader;
-	}
-
-	public void init() {
-		IStage clockPipeline = this.buildClockPipeline(1000);
-		this.clockThread = new Thread(new RunnableStage(clockPipeline));
-
-		IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
-		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
-	}
-
-	public void start() {
-
-		this.tcpThread.start();
-		this.clockThread.start();
-
-		try {
-			this.tcpThread.join();
-		} catch (InterruptedException e) {
-			throw new IllegalStateException(e);
-		}
-
-		this.clockThread.interrupt();
-	}
-
-	public int getNumRecords() {
-		return this.recordCounter.getNumElementsPassed();
-	}
-
-	public List<Long> getRecordThroughputs() {
-		return this.recordThroughputStage.getThroughputs();
-	}
-
-}
diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java
new file mode 100644
index 00000000..6c1e87ba
--- /dev/null
+++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysisConfiguration.java
@@ -0,0 +1,80 @@
+package teetime.examples.traceReading;
+
+import java.util.List;
+
+import teetime.framework.AnalysisConfiguration;
+import teetime.framework.IStage;
+import teetime.framework.RunnableStage;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
+import teetime.stage.Clock;
+import teetime.stage.Counter;
+import teetime.stage.ElementThroughputMeasuringStage;
+import teetime.stage.Pipeline;
+import teetime.stage.basic.Sink;
+import teetime.stage.basic.distributor.Distributor;
+import teetime.stage.io.network.TcpReader;
+
+import kieker.common.record.IMonitoringRecord;
+
+public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfiguration {
+
+	private Thread clockThread;
+	private Thread tcpThread;
+
+	private Counter<IMonitoringRecord> recordCounter;
+	private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
+	private final IPipeFactory intraThreadPipeFactory;
+	private final IPipeFactory interThreadPipeFactory;
+
+	public TcpTraceLoggingExtAnalysisConfiguration() {
+		intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+		interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
+	}
+
+	private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
+		Clock clockStage = new Clock();
+		clockStage.setInitialDelayInMs(intervalDelayInMs);
+		clockStage.setIntervalDelayInMs(intervalDelayInMs);
+		Distributor<Long> distributor = new Distributor<Long>();
+
+		intraThreadPipeFactory.create(clockStage.getOutputPort(), distributor.getInputPort());
+
+		return new Pipeline<Distributor<Long>>(clockStage, distributor);
+	}
+
+	private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) {
+		TcpReader tcpReader = new TcpReader();
+		this.recordCounter = new Counter<IMonitoringRecord>();
+		this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
+		Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
+
+		intraThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
+		intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
+		intraThreadPipeFactory.create(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
+		// intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), endStage.getInputPort());
+
+		interThreadPipeFactory.create(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
+
+		return tcpReader;
+	}
+
+	public void init() {
+		Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
+		this.clockThread = new Thread(new RunnableStage(clockPipeline));
+
+		IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
+		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
+	}
+
+	public int getNumRecords() {
+		return this.recordCounter.getNumElementsPassed();
+	}
+
+	public List<Long> getRecordThroughputs() {
+		return this.recordThroughputStage.getThroughputs();
+	}
+
+}
-- 
GitLab