From e5ae5810b73c6d8a4b53e9edc03de3a9c35ccea7 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 5 Dec 2014 09:44:01 +0100
Subject: [PATCH] migrated TcpTraceReconstructionAnalysis*

---
 ...ceReconstructionAnalysisConfiguration.java | 11 +--
 ...ReconstructionAnalysisWithThreadsTest.java | 11 +--
 ...ReconstructionAnalysisWithThreadsTest.java | 13 +--
 ...ctionAnalysisWithThreadsConfiguration.java | 87 +++++++++++--------
 4 files changed, 57 insertions(+), 65 deletions(-)

diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java
index 9b2a5e47..7d846870 100644
--- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java
+++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysisConfiguration.java
@@ -5,7 +5,6 @@ import java.util.List;
 
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.IStage;
-import teetime.framework.RunnableStage;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
 import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
@@ -32,10 +31,6 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
-	private Thread clockThread;
-	private Thread clock2Thread;
-	private Thread workerThread;
-
 	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
 
 	private Counter<IMonitoringRecord> recordCounter;
@@ -54,13 +49,13 @@ public class TcpTraceReconstructionAnalysisConfiguration extends AnalysisConfigu
 
 	private void init() {
 		Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000);
-		this.clockThread = new Thread(new RunnableStage(clockStage));
+		addThreadableStage(clockStage);
 
 		Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
-		this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
+		addThreadableStage(clock2Stage);
 
 		IStage pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
-		this.workerThread = new Thread(new RunnableStage(pipeline));
+		addThreadableStage(pipeline);
 	}
 
 	private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java
index 5e3af5a5..aa4b4f0b 100644
--- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -33,7 +33,6 @@ import org.junit.Test;
 import org.junit.runners.MethodSorters;
 
 import teetime.framework.Analysis;
-import teetime.framework.pipe.SpScPipe;
 import teetime.util.ListUtil;
 import teetime.util.StopWatch;
 import util.test.StatisticsUtil;
@@ -79,9 +78,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
 	}
 
 	void performAnalysis(final int numWorkerThreads) {
-		final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
-		configuration.setNumWorkerThreads(numWorkerThreads);
-		configuration.buildConfiguration();
+		final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads);
 
 		Analysis analysis = new Analysis(configuration);
 		analysis.init();
@@ -93,11 +90,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
 			this.stopWatch.end();
 		}
 
-		int maxNumWaits = 0;
-		for (SpScPipe pipe : configuration.getTcpRelayPipes()) {
-			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
-		}
-		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
+		System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits());
 
 		// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
 		// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
index 3082beaf..b76f39fc 100644
--- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
+++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 import org.junit.runners.MethodSorters;
 
 import teetime.framework.Analysis;
-import teetime.framework.pipe.SpScPipe;
 import teetime.util.ListUtil;
 import teetime.util.StopWatch;
 import util.test.StatisticsUtil;
@@ -87,11 +86,9 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
 	// Duration: 22373 ms
 
 	void performAnalysis(final int numWorkerThreads) {
-		final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
-		configuration.setNumWorkerThreads(numWorkerThreads);
-		configuration.buildConfiguration();
+		final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration(numWorkerThreads);
 
-		Analysis analysis = new Analysis(configuration);
+		final Analysis analysis = new Analysis(configuration);
 		analysis.init();
 
 		this.stopWatch.start();
@@ -101,11 +98,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
 			this.stopWatch.end();
 		}
 
-		int maxNumWaits = 0;
-		for (SpScPipe pipe : configuration.getTcpRelayPipes()) {
-			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
-		}
-		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
+		System.out.println("max #waits of TcpRelayPipes: " + configuration.getMaxNumWaits());
 
 		// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
 		// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
index 15c606b4..055a7895 100644
--- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
+++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java
@@ -9,7 +9,10 @@ import java.util.List;
 import teetime.framework.AbstractStage;
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.IStage;
-import teetime.framework.pipe.SingleElementPipe;
+import teetime.framework.pipe.IPipe;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
 import teetime.stage.Counter;
@@ -17,6 +20,7 @@ import teetime.stage.ElementDelayMeasuringStage;
 import teetime.stage.ElementThroughputMeasuringStage;
 import teetime.stage.InstanceCounter;
 import teetime.stage.InstanceOfFilter;
+import teetime.stage.Pipeline;
 import teetime.stage.Relay;
 import teetime.stage.basic.Sink;
 import teetime.stage.basic.distributor.Distributor;
@@ -38,7 +42,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 
-	private int numWorkerThreads;
+	private final int numWorkerThreads;
 
 	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
 
@@ -50,11 +54,14 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 	private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
 	private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
 
-	private final List<SpScPipe> tcpRelayPipes = new LinkedList<SpScPipe>();
+	private final List<IPipe> tcpRelayPipes = new LinkedList<IPipe>();
+	private final IPipeFactory intraThreadPipeFactory;
+	private final IPipeFactory interThreadPipeFactory;
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public TcpTraceReconstructionAnalysisWithThreadsConfiguration() {
+	public TcpTraceReconstructionAnalysisWithThreadsConfiguration(final int numWorkerThreads) {
 		super();
+		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, numWorkerThreads);
 
 		try {
 			this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
@@ -69,43 +76,46 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 		} catch (SecurityException e) {
 			throw new IllegalArgumentException(e);
 		}
+
+		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+		interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
+		init();
 	}
 
-	public void buildConfiguration() {
-		final IStage tcpPipeline = this.buildTcpPipeline();
-		this.getFiniteProducerStages().add(tcpPipeline);
+	private void init() {
+		Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
+		addThreadableStage(tcpPipeline);
 
-		final IStage clockStage = this.buildClockPipeline(1000);
-		this.getInfiniteProducerStages().add(clockStage);
+		Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(1000);
+		addThreadableStage(clockStage);
 
-		final IStage clock2Stage = this.buildClockPipeline(2000);
-		this.getInfiniteProducerStages().add(clock2Stage);
+		Pipeline<Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
+		addThreadableStage(clock2Stage);
 
-		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
 		for (int i = 0; i < this.numWorkerThreads; i++) {
 			IStage pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
-			this.getConsumerStages().add(pipeline);
+			addThreadableStage(pipeline);
 		}
 	}
 
-	private IStage buildTcpPipeline() {
+	private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
-		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
+		intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		return tcpReader;
+		return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
 	}
 
-	private IStage buildClockPipeline(final long intervalDelayInMs) {
+	private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
 		Distributor<Long> distributor = new Distributor<Long>();
 
-		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
+		intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
 
-		return clock;
+		return new Pipeline<Distributor<Long>>(clock, distributor);
 	}
 
 	private static class StageFactory<T extends AbstractStage> {
@@ -155,22 +165,22 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 		// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
 
 		// connect stages
-		SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
+		IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
 		this.tcpRelayPipes.add(tcpRelayPipe);
 		// SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe);
 
-		SpScPipe.connect(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
-		SpScPipe.connect(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
+		interThreadPipeFactory.create(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
+		interThreadPipeFactory.create(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
 
-		SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
-		SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
-		SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
-		SingleElementPipe.connect(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
-		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
-		// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
-		// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
-		SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
+		intraThreadPipeFactory.create(relay.getOutputPort(), recordCounter.getInputPort());
+		intraThreadPipeFactory.create(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
+		intraThreadPipeFactory.create(recordThroughputFilter.getOutputPort(), traceMetadataCounter.getInputPort());
+		intraThreadPipeFactory.create(traceMetadataCounter.getOutputPort(), instanceOfFilter.getInputPort());
+		intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceCounter.getInputPort());
+		// intraThreadPipeFactory.create(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
+		// intraThreadPipeFactory.create(traceThroughputFilter.getOutputPort(), traceCounter.getInputPort());
+		intraThreadPipeFactory.create(traceCounter.getOutputPort(), endStage.getInputPort());
 
 		return relay;
 	}
@@ -196,7 +206,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 	}
 
 	public List<Long> getRecordDelays() {
-		List<Long> throughputs = new LinkedList<Long>();
+		final List<Long> throughputs = new LinkedList<Long>();
 		for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordDelayFilterFactory.getStages()) {
 			throughputs.addAll(stage.getDelays());
 		}
@@ -227,18 +237,19 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
 		return numTraceMetadatas;
 	}
 
-	public List<SpScPipe> getTcpRelayPipes() {
-		return this.tcpRelayPipes;
+	public int getMaxNumWaits() {
+		int maxNumWaits = 0;
+		for (IPipe pipe : this.tcpRelayPipes) {
+			SpScPipe interThreadPipe = (SpScPipe) pipe;
+			maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits());
+		}
+		return maxNumWaits;
 	}
 
 	public int getNumWorkerThreads() {
 		return this.numWorkerThreads;
 	}
 
-	public void setNumWorkerThreads(final int numWorkerThreads) {
-		this.numWorkerThreads = numWorkerThreads;
-	}
-
 	public int getMaxElementsCreated() {
 		return this.traceId2trace.getMaxElements();
 	}
-- 
GitLab