diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java
index eee5f2285f9d3bc01af18c25c49c0eedb1865a62..4fd4b539ba2e24ae32eba88e132e0505e1d81a6b 100644
--- a/src/main/java/teetime/stage/io/EveryXthPrinter.java
+++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java
@@ -15,43 +15,38 @@
  */
 package teetime.stage.io;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import teetime.framework.CompositeStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
 import teetime.framework.TerminationStrategy;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.signal.ISignal;
 import teetime.framework.validation.InvalidPortConnection;
 import teetime.stage.EveryXthStage;
 import teetime.stage.basic.distributor.CopyByReferenceStrategy;
 import teetime.stage.basic.distributor.Distributor;
 
-public final class EveryXthPrinter<T> extends Stage {
+public final class EveryXthPrinter<T> extends CompositeStage {
 
 	private final Distributor<T> distributor;
+	private final List<Stage> lastStages = new ArrayList<Stage>();
 
 	public EveryXthPrinter(final int threshold) {
 		distributor = new Distributor<T>();
 		EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
 		Printer<Integer> printer = new Printer<Integer>();
 
-		IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
-		pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
+		connectStages(distributor.getNewOutputPort(), everyXthStage.getInputPort());
+		connectStages(everyXthStage.getOutputPort(), printer.getInputPort());
 
+		lastStages.add(printer);
 		distributor.setStrategy(new CopyByReferenceStrategy());
 	}
 
-	@Override
-	protected void executeWithPorts() {
-		distributor.executeWithPorts();
-	}
-
 	@Override
 	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
 		distributor.validateOutputPorts(invalidPortConnections);
@@ -95,4 +90,14 @@ public final class EveryXthPrinter<T> extends Stage {
 		return distributor.isStarted();
 	}
 
+	@Override
+	protected Stage getFirstStage() {
+		return distributor;
+	}
+
+	@Override
+	protected Collection<? extends Stage> getLastStages() {
+		return lastStages;
+	}
+
 }