From 3539ee892dda5d64706c3782893d1f5633f38724 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 5 Dec 2014 08:50:29 +0100
Subject: [PATCH] fixed TcpTraceReduction

---
 src/main/java/teetime/stage/Pipeline.java     |  2 +-
 .../kiekerdays/TcpTraceReduction.java         | 51 ++++++++++++-------
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git a/src/main/java/teetime/stage/Pipeline.java b/src/main/java/teetime/stage/Pipeline.java
index 02d8fb7c..0e23b41c 100644
--- a/src/main/java/teetime/stage/Pipeline.java
+++ b/src/main/java/teetime/stage/Pipeline.java
@@ -15,7 +15,7 @@ import teetime.framework.validation.InvalidPortConnection;
  * @param <L>
  *            the type of the last stage in this pipeline
  */
-public class Pipeline<L extends IStage> implements IStage {
+public final class Pipeline<L extends IStage> implements IStage {
 
 	private final IStage firstStage;
 	private final L lastStage;
diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
index 9e7c9e09..8d7a3c2a 100644
--- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java
@@ -8,10 +8,15 @@ import java.util.TreeMap;
 
 import teetime.framework.IStage;
 import teetime.framework.RunnableStage;
-import teetime.framework.pipe.SingleElementPipe;
+import teetime.framework.pipe.IPipe;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
 import teetime.stage.InstanceOfFilter;
+import teetime.stage.Pipeline;
 import teetime.stage.Relay;
 import teetime.stage.basic.Sink;
 import teetime.stage.basic.distributor.Distributor;
@@ -36,7 +41,7 @@ public class TcpTraceReduction {
 	private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
 	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
 	private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
-	private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>();
+	private final List<IPipe> tcpRelayPipes = new ArrayList<IPipe>();
 
 	private Thread tcpThread;
 	private Thread clockThread;
@@ -44,11 +49,19 @@ public class TcpTraceReduction {
 
 	private int numWorkerThreads;
 
+	private final IPipeFactory intraThreadPipeFactory;
+	private final IPipeFactory interThreadPipeFactory;
+
+	public TcpTraceReduction() {
+		intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+		interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
+	}
+
 	public void init() {
-		IStage tcpPipeline = this.buildTcpPipeline();
+		Pipeline<Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
 		this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
 
-		IStage clockStage = this.buildClockPipeline(5000);
+		Pipeline<Distributor<Long>> clockStage = this.buildClockPipeline(5000);
 		this.clockThread = new Thread(new RunnableStage(clockStage));
 
 		this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
@@ -60,24 +73,24 @@ public class TcpTraceReduction {
 		}
 	}
 
-	private IStage buildTcpPipeline() {
+	private Pipeline<Distributor<IMonitoringRecord>> buildTcpPipeline() {
 		TcpReader tcpReader = new TcpReader();
 		Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
 
-		SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
+		intraThreadPipeFactory.create(tcpReader.getOutputPort(), distributor.getInputPort());
 
-		return tcpReader;
+		return new Pipeline<Distributor<IMonitoringRecord>>(tcpReader, distributor);
 	}
 
-	private IStage buildClockPipeline(final long intervalDelayInMs) {
+	private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(intervalDelayInMs);
 		clock.setIntervalDelayInMs(intervalDelayInMs);
 		Distributor<Long> distributor = new Distributor<Long>();
 
-		SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
+		intraThreadPipeFactory.create(clock.getOutputPort(), distributor.getInputPort());
 
-		return clock;
+		return new Pipeline<Distributor<Long>>(clock, distributor);
 	}
 
 	private IStage buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) {
@@ -90,15 +103,15 @@ public class TcpTraceReduction {
 		Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
 
 		// connect stages
-		SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
+		IPipe tcpRelayPipe = interThreadPipeFactory.create(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
 		this.tcpRelayPipes.add(tcpRelayPipe);
 
-		SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
-		SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
-		SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
-		SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort());
+		intraThreadPipeFactory.create(relay.getOutputPort(), instanceOfFilter.getInputPort());
+		intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
+		intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), traceReductionFilter.getInputPort());
+		intraThreadPipeFactory.create(traceReductionFilter.getOutputPort(), endStage.getInputPort());
 
-		SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
+		interThreadPipeFactory.create(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
 
 		return relay;
 	}
@@ -126,8 +139,10 @@ public class TcpTraceReduction {
 
 	public void onTerminate() {
 		int maxNumWaits = 0;
-		for (SpScPipe pipe : this.tcpRelayPipes) {
-			maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
+		for (IPipe pipe : this.tcpRelayPipes) {
+			SpScPipe interThreadPipe = (SpScPipe) pipe;
+			// TODO introduce IInterThreadPipe
+			maxNumWaits = Math.max(maxNumWaits, interThreadPipe.getNumWaits());
 		}
 		System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
 	}
-- 
GitLab