From 6a727ed21b251f053c8e65ceb43d341ebb99c702 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de>
Date: Thu, 19 Feb 2015 11:27:52 +0100
Subject: [PATCH] refactored EveryXthPrinter

---
 .../teetime/stage/io/EveryXthPrinter.java     | 31 +++++++++++--------
 1 file changed, 18 insertions(+), 13 deletions(-)

diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java
index eee5f228..4fd4b539 100644
--- a/src/main/java/teetime/stage/io/EveryXthPrinter.java
+++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java
@@ -15,43 +15,38 @@
  */
 package teetime.stage.io;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import teetime.framework.CompositeStage;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.Stage;
 import teetime.framework.TerminationStrategy;
-import teetime.framework.pipe.IPipeFactory;
-import teetime.framework.pipe.PipeFactoryRegistry;
-import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
-import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
 import teetime.framework.signal.ISignal;
 import teetime.framework.validation.InvalidPortConnection;
 import teetime.stage.EveryXthStage;
 import teetime.stage.basic.distributor.CopyByReferenceStrategy;
 import teetime.stage.basic.distributor.Distributor;
 
-public final class EveryXthPrinter<T> extends Stage {
+public final class EveryXthPrinter<T> extends CompositeStage {
 
 	private final Distributor<T> distributor;
+	private final List<Stage> lastStages = new ArrayList<Stage>();
 
 	public EveryXthPrinter(final int threshold) {
 		distributor = new Distributor<T>();
 		EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
 		Printer<Integer> printer = new Printer<Integer>();
 
-		IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
-		pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
-		pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
+		connectStages(distributor.getNewOutputPort(), everyXthStage.getInputPort());
+		connectStages(everyXthStage.getOutputPort(), printer.getInputPort());
 
+		lastStages.add(printer);
 		distributor.setStrategy(new CopyByReferenceStrategy());
 	}
 
-	@Override
-	protected void executeWithPorts() {
-		distributor.executeWithPorts();
-	}
-
 	@Override
 	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
 		distributor.validateOutputPorts(invalidPortConnections);
@@ -95,4 +90,14 @@ public final class EveryXthPrinter<T> extends Stage {
 		return distributor.isStarted();
 	}
 
+	@Override
+	protected Stage getFirstStage() {
+		return distributor;
+	}
+
+	@Override
+	protected Collection<? extends Stage> getLastStages() {
+		return lastStages;
+	}
+
 }
-- 
GitLab