diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java
new file mode 100644
index 0000000000000000000000000000000000000000..550ccfcf84e0fbd47dac673bb94c3d4aaca8ca97
--- /dev/null
+++ b/src/main/java/teetime/framework/test/StageTester.java
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package teetime.framework.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.AssertionFailedError;
+
+import teetime.framework.Analysis;
+import teetime.framework.AnalysisConfiguration;
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.Stage;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
+import teetime.stage.CollectorSink;
+import teetime.stage.IterableProducer;
+
+/**
+ * This class can be used to test single stages in JUnit test cases.
+ *
+ * @author Nils Christian Ehmke
+ */
+public final class StageTester {
+
+	private final List<InputHolder<?>> inputHolders = new ArrayList<InputHolder<?>>();
+	private final List<OutputHolder<?>> outputHolders = new ArrayList<OutputHolder<?>>();
+	private final Stage stage;
+
+	private StageTester(final Stage stage) {
+		this.stage = stage;
+	}
+
+	public static StageTester test(final Stage stage) {
+		return new StageTester(stage);
+	}
+
+	public <I> InputHolder<I> send(final Iterable<I> input) {
+		InputHolder<I> inputHolder = new InputHolder<I>(input);
+		this.inputHolders.add(inputHolder);
+		return inputHolder;
+	}
+
+	public <O> OutputHolder<O> receive(final List<O> output) {
+		OutputHolder<O> outputHolder = new OutputHolder<O>(output);
+		this.outputHolders.add(outputHolder);
+		return outputHolder;
+	}
+
+	public StageTester and() {
+		return this;
+	}
+
+	public void start() {
+		final AnalysisConfiguration configuration = new Configuration();
+		final Analysis analysis = new Analysis(configuration);
+		analysis.start();
+	}
+
+	public final class InputHolder<I> {
+
+		private final Iterable<Object> input;
+		private InputPort<Object> port;
+
+		@SuppressWarnings("unchecked")
+		private InputHolder(final Iterable<I> input) {
+			this.input = (Iterable<Object>) input;
+		}
+
+		@SuppressWarnings("unchecked")
+		public StageTester to(final InputPort<I> port) {
+			if (port.getOwningStage() != stage) {
+				throw new AssertionFailedError();
+			}
+			this.port = (InputPort<Object>) port;
+
+			return StageTester.this;
+		}
+
+		public Iterable<Object> getInput() {
+			return input;
+		}
+
+		public InputPort<Object> getPort() {
+			return port;
+		}
+
+	}
+
+	public final class OutputHolder<O> {
+
+		private final List<Object> output;
+		private OutputPort<Object> port;
+
+		@SuppressWarnings("unchecked")
+		private OutputHolder(final List<O> output) {
+			this.output = (List<Object>) output;
+		}
+
+		@SuppressWarnings("unchecked")
+		public StageTester from(final OutputPort<O> port) {
+			this.port = (OutputPort<Object>) port;
+
+			return StageTester.this;
+		}
+
+		public List<Object> getOutput() {
+			return output;
+		}
+
+		public OutputPort<Object> getPort() {
+			return port;
+		}
+
+	}
+
+	private final class Configuration extends AnalysisConfiguration {
+
+		public Configuration() {
+			IPipeFactory pipeFactory = AnalysisConfiguration.PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+
+			for (InputHolder<?> inputHolder : inputHolders) {
+				final IterableProducer<Object> producer = new IterableProducer<Object>(inputHolder.getInput());
+				pipeFactory.create(producer.getOutputPort(), inputHolder.getPort());
+				addThreadableStage(producer);
+			}
+			for (OutputHolder<?> outputHolder : outputHolders) {
+				final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutput());
+				pipeFactory.create(outputHolder.getPort(), sink.getInputPort());
+			}
+		}
+
+	}
+
+}
diff --git a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
index 50e580fa4618b6d203428abb99e5ceee4988d032..4513ebaa04093363a9aafe9ec44605cdc0d5427d 100644
--- a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
+++ b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
@@ -16,61 +16,48 @@
 package teetime.stage;
 
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
 import org.junit.Test;
 
-import teetime.framework.Analysis;
-import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OutputPort;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
-import teetime.util.Pair;
+import teetime.framework.test.StageTester;
 
+/**
+ * @author Nils Christian Ehmke
+ */
 public class MultipleInstanceOfFilterTest {
 
-	private static class TestConfiguration extends AnalysisConfiguration {
-
-		public TestConfiguration(final List<Number> initialInput, final List<Integer> integerList, final List<Float> floatList) {
-			// Create the stages
-			final InitialElementProducer<Number> producer = new InitialElementProducer<Number>(initialInput.toArray(new Number[0]));
-			final MultipleInstanceOfFilter<Number> filter = new MultipleInstanceOfFilter<Number>();
-			final CollectorSink<Integer> integerSink = new CollectorSink<Integer>(integerList);
-			final CollectorSink<Float> floatSink = new CollectorSink<Float>(floatList);
-
-			// Connect the stages
-			final IPipeFactory factory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-			factory.create(producer.getOutputPort(), filter.getInputPort());
-			factory.create(filter.getOutputPortForType(Integer.class), integerSink.getInputPort());
-			factory.create(filter.getOutputPortForType(Float.class), floatSink.getInputPort());
+	@Test
+	@SuppressWarnings("unchecked")
+	public void filteringForSingleTypeShouldWork() {
+		final MultipleInstanceOfFilter<Object> filter = new MultipleInstanceOfFilter<Object>();
+		final List<Object> input = new ArrayList<Object>(Arrays.asList("1", 1.5f, "2", 2.5f, "3", 3.5f));
+		final List<String> result = new ArrayList<String>();
 
-			super.addThreadableStage(producer);
-		}
+		StageTester.test(filter).and().send(input).to(filter.getInputPort()).and().receive(result).from(filter.getOutputPortForType(String.class)).start();
 
+		assertThat(result, contains("1", "2", "3"));
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void filteringShouldWork() {
-		final List<Number> initialInput = new ArrayList<Number>(Arrays.asList(1, 1.5f, 2, 2.5f, 3, 3.5f));
-		final List<Integer> integerList = new ArrayList<Integer>();
-		final List<Float> floatList = new ArrayList<Float>();
-
-		final Analysis analysis = new Analysis(new TestConfiguration(initialInput, integerList, floatList));
-		final Collection<Pair<Thread, Throwable>> errors = analysis.start();
+	public void filteringForMultipleTypesShouldWork() {
+		final MultipleInstanceOfFilter<Number> filter = new MultipleInstanceOfFilter<Number>();
+		final List<Number> input = new ArrayList<Number>(Arrays.asList(1, 1.5f, 2, 2.5f, 3, 3.5f));
+		final List<Integer> integers = new ArrayList<Integer>();
+		final List<Float> floats = new ArrayList<Float>();
 
-		assertThat(errors, is(empty()));
+		StageTester.test(filter).and().send(input).to(filter.getInputPort()).and().receive(integers).from(filter.getOutputPortForType(Integer.class)).and()
+				.receive(floats).from(filter.getOutputPortForType(Float.class)).start();
 
-		assertThat(integerList, contains(1, 2, 3));
-		assertThat(floatList, contains(1.5f, 2.5f, 3.5f));
+		assertThat(integers, contains(1, 2, 3));
+		assertThat(floats, contains(1.5f, 2.5f, 3.5f));
 	}
 
 	@Test