From d4b35b76ae36b4a00f9bd30a5fb56684896755ff Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 6 Jul 2014 02:33:12 +0200
Subject: [PATCH] switched to the generic distributor

---
 conf/logging.properties                       |   4 +-
 .../framework/core/AbstractStage.java         |   3 +-
 .../stage/Distributor.java                    | 105 ------------------
 .../ElementThroughputMeasuringStage.java      |  24 ++--
 .../stage/basic/distributor/Distributor.java  |  42 +++++--
 .../MethodCallThroughputAnalysis16.java       |   2 +-
 .../MethodCallThroughputAnalysis17.java       |   2 +-
 .../MethodCallThroughputAnalysis18.java       |   2 +-
 .../MethodCallThroughputAnalysis19.java       |   2 +-
 .../kiekerdays/TcpTraceReconstruction.java    |   2 +-
 .../kiekerdays/TcpTraceReduction.java         |   2 +-
 .../TcpTraceReconstructionAnalysis.java       |   2 +-
 ...raceReconstructionAnalysisWithThreads.java |   2 +-
 .../TcpTraceReductionAnalysisWithThreads.java |   2 +-
 14 files changed, 55 insertions(+), 141 deletions(-)
 delete mode 100644 src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java

diff --git a/conf/logging.properties b/conf/logging.properties
index d8c2b84..6635420 100644
--- a/conf/logging.properties
+++ b/conf/logging.properties
@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
 #teetime.level = ALL
 
 #teetime.variant.methodcallWithPorts.framework.level = ALL
-#teetime.variant.methodcallWithPorts.framework.core.level = ALL
-#teetime.variant.methodcallWithPorts.stage.level = FINE
+teetime.variant.methodcallWithPorts.framework.core.level = FINE
+teetime.variant.methodcallWithPorts.stage.level = INFO
 #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index e2014a6..3391588 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -107,8 +107,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
 	 */
 	@Override
 	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
-		this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
-		// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
+		this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
 
 		switch (signal) {
 		case FINISHED:
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
deleted file mode 100644
index c700ebf..0000000
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package teetime.variant.methodcallWithPorts.stage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import teetime.util.concurrent.spsc.Pow2;
-import teetime.util.list.CommittableQueue;
-import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
-import teetime.variant.methodcallWithPorts.framework.core.InputPort;
-import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
-import teetime.variant.methodcallWithPorts.framework.core.Signal;
-
-public final class Distributor<T> extends AbstractStage<T, T> {
-
-	// TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor
-
-	private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>();
-
-	private OutputPort<T>[] outputPorts;
-	private int nextOutputPortIndex;
-
-	private int size;
-
-	// private int mask;
-
-	@Override
-	protected void execute4(final CommittableQueue<T> elements) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	protected void execute5(final T element) {
-		OutputPort<T> outputPort = this.outputPorts[this.nextOutputPortIndex % this.size];
-		this.nextOutputPortIndex++;
-		outputPort.send(element);
-	}
-
-	@Override
-	public void onIsPipelineHead() {
-		// for (OutputPort<?> op : this.outputPorts) {
-		// op.getPipe().close();
-		// if (this.logger.isDebugEnabled()) {
-		// this.logger.debug("End signal sent, size: " + op.getPipe().size());
-		// }
-		// }
-
-		// for (OutputPort<?> op : this.outputPorts) {
-		// op.pipe = null;
-		// }
-		// this.outputPorts = null;
-		// this.outputPortList.clear();
-	}
-
-	@Override
-	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
-		this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
-		// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
-
-		switch (signal) {
-		case FINISHED:
-			this.onFinished();
-			break;
-		default:
-			this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
-			break;
-		}
-
-		for (OutputPort<?> op : this.outputPorts) {
-			op.sendSignal(signal);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void onStart() {
-		this.size = this.outputPortList.size();
-		// this.mask = this.size - 1;
-
-		int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
-		this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
-		// System.out.println("outputPorts: " + this.outputPorts);
-	}
-
-	@Override
-	public OutputPort<T> getOutputPort() {
-		return this.getNewOutputPort();
-	}
-
-	private OutputPort<T> getNewOutputPort() {
-		OutputPort<T> outputPort = new OutputPort<T>();
-		this.outputPortList.add(outputPort);
-		return outputPort;
-	}
-
-	@Override
-	public void executeWithPorts() {
-		T element = this.getInputPort().receive();
-
-		this.setReschedulable(this.getInputPort().getPipe().size() > 0);
-
-		this.execute5(element);
-	}
-
-}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
index fd2bdfc..66b9bfd 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
@@ -37,23 +37,23 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
 		long diffInNs = timestampInNs - this.lastTimestampInNs;
 
 		// BETTER use the TimeUnit of the clock
-		long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
-		if (diffInMs > 0) {
-			long throughputPerSec = this.numPassedElements / diffInMs;
-			this.throughputs.add(throughputPerSec);
-			this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
+		long throughputPerTimeUnit = -1;
 
-			this.resetTimestamp(timestampInNs);
+		long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
+		if (diffInSec > 0) {
+			throughputPerTimeUnit = this.numPassedElements / diffInSec;
+			this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
 		} else {
-			long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
-			if (diffInSec > 0) {
-				long throughputPerSec = this.numPassedElements / diffInSec;
-				this.throughputs.add(throughputPerSec);
-				this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
+			long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+			if (diffInMs > 0) {
+				throughputPerTimeUnit = this.numPassedElements / diffInMs;
+				this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
 
-				this.resetTimestamp(timestampInNs);
 			}
 		}
+
+		this.throughputs.add(throughputPerTimeUnit);
+		this.resetTimestamp(timestampInNs);
 	}
 
 	private void resetTimestamp(final Long timestampInNs) {
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java
index 46454a8..dd9418a 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java
@@ -20,7 +20,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
 import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
+import teetime.variant.methodcallWithPorts.framework.core.Signal;
 
 /**
  * @author Christian Wulf
@@ -39,12 +41,13 @@ public class Distributor<T> extends AbstractStage<T, T> {
 
 	private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
 
-	public IDistributorStrategy<T> getStrategy() {
-		return this.strategy;
-	}
+	@Override
+	public void executeWithPorts() {
+		T element = this.getInputPort().receive();
 
-	public void setStrategy(final IDistributorStrategy<T> strategy) {
-		this.strategy = strategy;
+		this.setReschedulable(this.getInputPort().getPipe().size() > 0);
+
+		this.execute5(element);
 	}
 
 	@Override
@@ -60,6 +63,24 @@ public class Distributor<T> extends AbstractStage<T, T> {
 		// }
 	}
 
+	@Override
+	public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+		this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
+
+		switch (signal) {
+		case FINISHED:
+			this.onFinished();
+			break;
+		default:
+			this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
+			break;
+		}
+
+		for (OutputPort<T> op : this.outputPortList) {
+			op.sendSignal(signal);
+		}
+	}
+
 	@Override
 	public OutputPort<T> getOutputPort() {
 		return this.getNewOutputPort();
@@ -75,13 +96,12 @@ public class Distributor<T> extends AbstractStage<T, T> {
 		return this.outputPortList;
 	}
 
-	@Override
-	public void executeWithPorts() {
-		T element = this.getInputPort().receive();
-
-		this.setReschedulable(this.getInputPort().getPipe().size() > 0);
+	public IDistributorStrategy<T> getStrategy() {
+		return this.strategy;
+	}
 
-		this.execute5(element);
+	public void setStrategy(final IDistributorStrategy<T> strategy) {
+		this.strategy = strategy;
 	}
 
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
index b9dc83f..1e82e3f 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java
@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
 import teetime.variant.methodcallWithPorts.stage.Relay;
 import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
 import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 
 /**
  * @author Christian Wulf
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
index 1695493..ae2572f 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java
@@ -30,7 +30,6 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
@@ -38,6 +37,7 @@ import teetime.variant.methodcallWithPorts.stage.Relay;
 import teetime.variant.methodcallWithPorts.stage.Sink;
 import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
 import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 
 /**
  * @author Christian Wulf
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
index 593af76..9efe097 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java
@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
 import teetime.variant.methodcallWithPorts.stage.Relay;
 import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
 import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 
 /**
  * @author Christian Wulf
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
index 3ba4f46..6df61a3 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java
@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
 import teetime.variant.methodcallWithPorts.stage.Relay;
 import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
 import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 
 /**
  * @author Christian Wulf
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
index 335b092..bc169bd 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java
@@ -12,10 +12,10 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
index 988088f..845d18e 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java
@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Clock;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
index 141ed49..7355908 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java
@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.CountingFilter;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
index 987ed2a..e006146 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java
@@ -17,12 +17,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.CountingFilter;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
index e916bf9..f1a5f78 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java
@@ -18,12 +18,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
 import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
 import teetime.variant.methodcallWithPorts.stage.Clock;
 import teetime.variant.methodcallWithPorts.stage.CountingFilter;
-import teetime.variant.methodcallWithPorts.stage.Distributor;
 import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
 import teetime.variant.methodcallWithPorts.stage.EndStage;
 import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
 import teetime.variant.methodcallWithPorts.stage.Relay;
+import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
 import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
 import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
-- 
GitLab