From d68d091e0a713ef96f7bd90e6180cd5c56b17d5f Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de>
Date: Mon, 20 Apr 2015 12:51:21 +0200
Subject: [PATCH] introduced static methods for connecting stages #138

---
 .../framework/AnalysisConfiguration.java      | 34 +++++++++++++++++--
 .../teetime/framework/test/StageTester.java   | 10 ++----
 .../MethodCallThroughputAnalysis15.java       | 19 +++--------
 .../experiment16/AnalysisConfiguration16.java | 20 ++++-------
 .../LoopStageAnalysisConfiguration.java       |  6 +---
 .../examples/cipher/CipherConfiguration.java  | 17 ++++------
 .../tokenizer/TokenizerConfiguration.java     | 16 ++++-----
 .../java/teetime/framework/AnalysisTest.java  |  6 +---
 ...unnableConsumerStageTestConfiguration.java |  6 +---
 .../java/teetime/framework/StageTest.java     |  6 +---
 .../java/teetime/framework/TraversorTest.java | 17 +++-------
 .../framework/WaitStrategyConfiguration.java  | 16 +++------
 .../framework/YieldStrategyConfiguration.java | 11 ++----
 .../ExceptionHandlingTest.java                |  1 +
 .../ExceptionTestConfiguration.java           |  5 +--
 .../teetime/stage/InstanceOfFilterTest.java   | 11 ++----
 16 files changed, 78 insertions(+), 123 deletions(-)

diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java
index 82e8152c..db99b83b 100644
--- a/src/main/java/teetime/framework/AnalysisConfiguration.java
+++ b/src/main/java/teetime/framework/AnalysisConfiguration.java
@@ -18,7 +18,11 @@ package teetime.framework;
 import java.util.LinkedList;
 import java.util.List;
 
+import teetime.framework.pipe.IPipe;
+import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 
 /**
  * Represents a configuration of connected stages, which is needed to run a analysis.
@@ -26,11 +30,17 @@ import teetime.framework.pipe.PipeFactoryRegistry;
  */
 public abstract class AnalysisConfiguration {
 
+	private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
+	private final IPipeFactory intraThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+	private final IPipeFactory interBoundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true);
+	private final IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
+
 	/**
 	 * Can be used by subclasses, to obtain pipe factories
 	 */
+	@Deprecated
+	// TODO: set private
 	protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
-	private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
 
 	List<Stage> getThreadableStageJobs() {
 		return this.threadableStageJobs;
@@ -40,10 +50,30 @@ public abstract class AnalysisConfiguration {
 	 * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
 	 *
 	 * @param stage
-	 *            A arbitrary stage, which will be added to the configuration und executed in a thread.
+	 *            A arbitrary stage, which will be added to the configuration and executed in a thread.
 	 */
 	protected void addThreadableStage(final Stage stage) {
 		this.threadableStageJobs.add(stage);
 	}
 
+	protected <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return intraThreadFactory.create(sourcePort, targetPort);
+	}
+
+	protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return interBoundedThreadFactory.create(sourcePort, targetPort);
+	}
+
+	protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return interUnboundedThreadFactory.create(sourcePort, targetPort);
+	}
+
+	protected <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return interBoundedThreadFactory.create(sourcePort, targetPort, capacity);
+	}
+
+	protected <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity);
+	}
+
 }
diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java
index 4d3cf95e..29b64eeb 100644
--- a/src/main/java/teetime/framework/test/StageTester.java
+++ b/src/main/java/teetime/framework/test/StageTester.java
@@ -23,9 +23,6 @@ import teetime.framework.Analysis;
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.Stage;
 import teetime.framework.StageState;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.IterableProducer;
 
@@ -80,22 +77,19 @@ public final class StageTester {
 	private final class Configuration extends AnalysisConfiguration {
 
 		public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) {
-			final IPipeFactory interPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
 			for (InputHolder<?> inputHolder : inputHolders) {
 				final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput());
-				interPipeFactory.create(producer.getOutputPort(), inputHolder.getPort());
+				connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort());
 				addThreadableStage(producer);
 			}
 
 			addThreadableStage(stage);
 
-			final IPipeFactory intraPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 			for (OutputHolder<?> outputHolder : outputHolders) {
 				final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
-				intraPipeFactory.create(outputHolder.getPort(), sink.getInputPort());
+				connectIntraThreads(outputHolder.getPort(), sink.getInputPort());
 			}
 		}
-
 	}
 
 }
diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
index be73f9db..51586ffb 100644
--- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -21,10 +21,7 @@ import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OldHeadPipeline;
 import teetime.framework.RunnableProducerStage;
 import teetime.framework.Stage;
-import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.Clock;
 import teetime.stage.CollectorSink;
@@ -47,8 +44,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
 
 	private static final int SPSC_INITIAL_CAPACITY = 4;
 
-	private final IPipeFactory intraThreadPipeFactory;
-
 	private int numInputObjects;
 	private ConstructorClosure<TimestampObject> inputObjectCreator;
 	private int numNoopFilters;
@@ -58,10 +53,6 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
 	private Runnable runnable;
 	private Clock clock;
 
-	public MethodCallThroughputAnalysis15() {
-		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-	}
-
 	public void init() {
 		OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
 		this.clockRunnable = new RunnableProducerStage(clockPipeline);
@@ -107,15 +98,15 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
 
 		SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
 
-		intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
-		intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		connectIntraThreads(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+		connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+			connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
 		OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
 
-		intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort());
+		connectIntraThreads(delay.getOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
diff --git a/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java
index feeb708c..396cce7a 100644
--- a/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java
+++ b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java
@@ -21,9 +21,6 @@ import java.util.List;
 
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -46,8 +43,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
 	private static final int SPSC_INITIAL_CAPACITY = 100100;
 	private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
 
-	private final IPipeFactory intraThreadPipeFactory;
-
 	private int numInputObjects;
 	private ConstructorClosure<TimestampObject> inputObjectCreator;
 	private final int numNoopFilters;
@@ -59,7 +54,6 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
 	public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) {
 		this.numWorkerThreads = numWorkerThreads;
 		this.numNoopFilters = numNoopFilters;
-		this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 	}
 
 	public void build() {
@@ -87,7 +81,7 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
 		pipeline.setFirstStage(objectProducer);
 		pipeline.setLastStage(distributor);
 
-		intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort());
+		connectIntraThreads(objectProducer.getOutputPort(), distributor.getInputPort());
 
 		return pipeline;
 	}
@@ -117,15 +111,15 @@ class AnalysisConfiguration16 extends AnalysisConfiguration {
 
 		SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
 
-		intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort());
+		connectIntraThreads(relay.getOutputPort(), startTimestampFilter.getInputPort());
 
-		intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		connectIntraThreads(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+			connectIntraThreads(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
-		intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
-		intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
+		connectIntraThreads(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		connectIntraThreads(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
+		connectIntraThreads(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
diff --git a/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java b/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java
index 65823155..3978b766 100644
--- a/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java
+++ b/src/performancetest/java/teetime/examples/loopStage/LoopStageAnalysisConfiguration.java
@@ -16,17 +16,13 @@
 package teetime.examples.loopStage;
 
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 
 public class LoopStageAnalysisConfiguration extends AnalysisConfiguration {
 
 	public LoopStageAnalysisConfiguration() {
 		Countdown countdown = new Countdown(10);
 
-		IPipeFactory factory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.QUEUE_BASED, true);
-		factory.create(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort());
+		connectIntraThreads(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort());
 
 		// this.getFiniteProducerStages().add(countdown);
 		this.addThreadableStage(countdown);
diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java
index 5531ef8c..64dbbb0b 100644
--- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java
+++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java
@@ -18,9 +18,6 @@ package teetime.examples.cipher;
 import java.io.File;
 
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CipherByteArray;
 import teetime.stage.CipherByteArray.CipherMode;
 import teetime.stage.InitialElementProducer;
@@ -43,14 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration {
 		final CipherByteArray decrypt = new CipherByteArray(password, CipherMode.DECRYPT);
 		final ByteArrayFileWriter writer = new ByteArrayFileWriter(output);
 
-		final IPipeFactory intraFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-
-		intraFactory.create(init.getOutputPort(), f2b.getInputPort());
-		intraFactory.create(f2b.getOutputPort(), enc.getInputPort());
-		intraFactory.create(enc.getOutputPort(), comp.getInputPort());
-		intraFactory.create(comp.getOutputPort(), decomp.getInputPort());
-		intraFactory.create(decomp.getOutputPort(), decrypt.getInputPort());
-		intraFactory.create(decrypt.getOutputPort(), writer.getInputPort());
+		connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
+		connectIntraThreads(f2b.getOutputPort(), enc.getInputPort());
+		connectIntraThreads(enc.getOutputPort(), comp.getInputPort());
+		connectIntraThreads(comp.getOutputPort(), decomp.getInputPort());
+		connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort());
+		connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort());
 
 		// this.getFiniteProducerStages().add(init);
 		this.addThreadableStage(init);
diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java
index 5d00b8f6..129cdd4f 100644
--- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java
+++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java
@@ -18,9 +18,6 @@ package teetime.examples.tokenizer;
 import java.io.File;
 
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.ByteArray2String;
 import teetime.stage.CipherByteArray;
 import teetime.stage.CipherByteArray.CipherMode;
@@ -33,7 +30,6 @@ import teetime.stage.string.Tokenizer;
 
 public class TokenizerConfiguration extends AnalysisConfiguration {
 
-	private static final IPipeFactory INTRA_PIPE_FACTORY = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 	private final Counter<String> counter;
 
 	public TokenizerConfiguration(final String inputFile, final String password) {
@@ -47,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration {
 		final Tokenizer tokenizer = new Tokenizer(" ");
 		this.counter = new Counter<String>();
 
-		INTRA_PIPE_FACTORY.create(init.getOutputPort(), f2b.getInputPort());
-		INTRA_PIPE_FACTORY.create(f2b.getOutputPort(), decomp.getInputPort());
-		INTRA_PIPE_FACTORY.create(decomp.getOutputPort(), decrypt.getInputPort());
-		INTRA_PIPE_FACTORY.create(decrypt.getOutputPort(), b2s.getInputPort());
-		INTRA_PIPE_FACTORY.create(b2s.getOutputPort(), tokenizer.getInputPort());
-		INTRA_PIPE_FACTORY.create(tokenizer.getOutputPort(), this.counter.getInputPort());
+		connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
+		connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort());
+		connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort());
+		connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort());
+		connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort());
+		connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort());
 
 		this.addThreadableStage(init);
 	}
diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java
index 61929e18..f9905c60 100644
--- a/src/test/java/teetime/framework/AnalysisTest.java
+++ b/src/test/java/teetime/framework/AnalysisTest.java
@@ -25,9 +25,6 @@ import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Test;
 
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.InitialElementProducer;
 import teetime.util.StopWatch;
 
@@ -69,13 +66,12 @@ public class AnalysisTest {
 	}
 
 	private static class TestConfig extends AnalysisConfiguration {
-		final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 		public final DelayAndTerminate delay;
 
 		public TestConfig() {
 			final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello");
 			delay = new DelayAndTerminate(DELAY_IN_MS);
-			intraFact.create(init.getOutputPort(), delay.getInputPort());
+			connectIntraThreads(init.getOutputPort(), delay.getInputPort());
 			addThreadableStage(init);
 		}
 	}
diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
index d542c2d3..892c3cd0 100644
--- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
+++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java
@@ -18,9 +18,6 @@ package teetime.framework;
 import java.util.ArrayList;
 import java.util.List;
 
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
 
@@ -38,8 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio
 		CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
 		addThreadableStage(collectorSink);
 
-		IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
-		pipeFactory.create(producer.getOutputPort(), collectorSink.getInputPort());
+		connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort());
 
 		this.collectorSink = collectorSink;
 	}
diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java
index ea66d118..5fcc1dfe 100644
--- a/src/test/java/teetime/framework/StageTest.java
+++ b/src/test/java/teetime/framework/StageTest.java
@@ -23,9 +23,6 @@ import static org.junit.Assert.assertThat;
 import org.junit.Assert;
 import org.junit.Test;
 
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.Cache;
 import teetime.stage.Counter;
 import teetime.stage.InitialElementProducer;
@@ -57,14 +54,13 @@ public class StageTest {
 	}
 
 	private static class TestConfig extends AnalysisConfiguration {
-		final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
 		public final DelayAndTerminate delay;
 		public InitialElementProducer<String> init;
 
 		public TestConfig() {
 			init = new InitialElementProducer<String>("Hello");
 			delay = new DelayAndTerminate(0);
-			intraFact.create(init.getOutputPort(), delay.getInputPort());
+			connectIntraThreads(init.getOutputPort(), delay.getInputPort());
 			addThreadableStage(init);
 		}
 	}
diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java
index 14d168e1..9f4ef250 100644
--- a/src/test/java/teetime/framework/TraversorTest.java
+++ b/src/test/java/teetime/framework/TraversorTest.java
@@ -24,9 +24,6 @@ import java.util.Set;
 import org.junit.Test;
 
 import teetime.framework.pipe.IPipe;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CountingMapMerger;
 import teetime.stage.InitialElementProducer;
 import teetime.stage.basic.distributor.Distributor;
@@ -69,13 +66,9 @@ public class TraversorTest {
 			final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
 			// CountingMapMerger (already as field)
 
-			// PipeFactory instaces for intra- and inter-thread communication
-			final IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
-			final IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-
 			// Connecting the stages of the first part of the config
-			intraFact.create(init.getOutputPort(), f2b.getInputPort());
-			intraFact.create(f2b.getOutputPort(), distributor.getInputPort());
+			connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
+			connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort());
 
 			// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
 			for (int i = 0; i < threads; i++) {
@@ -83,15 +76,15 @@ public class TraversorTest {
 				final WordCounter wc = new WordCounter();
 				// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
 
-				final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000);
-				final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort());
+				final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000);
+				final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort());
 				// Add WordCounter as a threadable stage, so it runs in its own thread
 				addThreadableStage(wc);
 
 			}
 
 			// Connect the stages of the last part
-			intraFact.create(merger.getOutputPort(), result.getInputPort());
+			connectIntraThreads(merger.getOutputPort(), result.getInputPort());
 
 			// Add the first and last part to the threadable stages
 			addThreadableStage(init);
diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java
index 269ce5bb..4d4dff16 100644
--- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java
+++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java
@@ -15,9 +15,6 @@
  */
 package teetime.framework;
 
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.Clock;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
@@ -26,15 +23,10 @@ import teetime.stage.basic.Delay;
 
 class WaitStrategyConfiguration extends AnalysisConfiguration {
 
-	private final IPipeFactory intraThreadPipeFactory;
-	private final IPipeFactory interThreadPipeFactory;
-
 	private Delay<Object> delay;
 	private CollectorSink<Object> collectorSink;
 
 	public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
-		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
 
 		Stage producer = buildProducer(elements);
 		addThreadableStage(producer);
@@ -50,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
 		Clock clock = new Clock();
 		clock.setInitialDelayInMs(initialDelayInMs);
 
-		interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
+		connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
 
 		return clock;
 	}
@@ -59,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
 		InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements);
 		delay = new Delay<Object>();
 
-		intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort());
+		connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort());
 
 		return initialElementProducer;
 	}
@@ -70,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
 
 		// relay.setIdleStrategy(new WaitStrategy(relay));
 
-		interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort());
-		intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
+		connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort());
+		connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort());
 
 		this.collectorSink = collectorSink;
 
diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java
index 3802a2c5..f3e48a4d 100644
--- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java
+++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java
@@ -15,22 +15,15 @@
  */
 package teetime.framework;
 
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.stage.CollectorSink;
 import teetime.stage.InitialElementProducer;
 import teetime.stage.Relay;
 
 class YieldStrategyConfiguration extends AnalysisConfiguration {
-	private final IPipeFactory intraThreadPipeFactory;
-	private final IPipeFactory interThreadPipeFactory;
 
 	private CollectorSink<Object> collectorSink;
 
 	public YieldStrategyConfiguration(final Object... elements) {
-		intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
 
 		InitialElementProducer<Object> producer = buildProducer(elements);
 		addThreadableStage(producer);
@@ -51,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration {
 
 		// relay.setIdleStrategy(new YieldStrategy());
 
-		interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort());
-		intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
+		connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort());
+		connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort());
 
 		this.collectorSink = collectorSink;
 
diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
index 4d231974..1dc02f2b 100644
--- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
+++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
@@ -60,6 +60,7 @@ public class ExceptionHandlingTest {
 				exceptionArised = true;
 			}
 			assertTrue(exceptionArised);
+
 			exceptionArised = false;
 			try {
 				terminatesAllStages();
diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
index 95a6c8d3..a9a1d1df 100644
--- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
+++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java
@@ -16,8 +16,6 @@
 package teetime.framework.exceptionHandling;
 
 import teetime.framework.AnalysisConfiguration;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 
 public class ExceptionTestConfiguration extends AnalysisConfiguration {
 
@@ -30,8 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration {
 		second = new ExceptionTestConsumerStage();
 		third = new ExceptionTestProducerStage();
 
-		PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false)
-				.create(first.getOutputPort(), second.getInputPort());
+		connectBoundedInterThreads(first.getOutputPort(), second.getInputPort());
 		// this.addThreadableStage(new ExceptionTestStage());
 
 		this.addThreadableStage(first);
diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java
index 4a85cf41..186ce0c4 100644
--- a/src/test/java/teetime/stage/InstanceOfFilterTest.java
+++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java
@@ -32,9 +32,6 @@ import org.junit.Test;
 import teetime.framework.Analysis;
 import teetime.framework.AnalysisConfiguration;
 import teetime.framework.AnalysisException;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.util.Pair;
 
 /**
@@ -127,17 +124,15 @@ public class InstanceOfFilterTest {
 
 	private static class InstanceOfFilterTestConfig extends AnalysisConfiguration {
 
-		private final IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-
 		public InstanceOfFilterTestConfig() {
 			InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>();
 			InstanceOfFilter<Object, Clazz> instanceOfFilter = new InstanceOfFilter<Object, Clazz>(Clazz.class);
 			CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>();
 			CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>();
 
-			pipeFactory.create(elementProducer.getOutputPort(), instanceOfFilter.getInputPort());
-			pipeFactory.create(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort());
-			pipeFactory.create(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort());
+			connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort());
+			connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort());
+			connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort());
 
 			addThreadableStage(elementProducer);
 		}
-- 
GitLab