diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 84ced579f485f65e7ae0e20cf783001dd7f64bd7..44b10e6548bc3c431528d1fe50de4ef939b32783 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -18,7 +18,9 @@ package teetime.framework; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; @@ -39,9 +41,14 @@ public abstract class AbstractCompositeStage extends Stage { private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + private final Set<Stage> containingStages = new HashSet<Stage>(); + private final Set<Stage> lastStages = new HashSet<Stage>(); + protected abstract Stage getFirstStage(); - protected abstract Collection<? extends Stage> getLastStages(); + protected final Collection<? extends Stage> getLastStages() { + return lastStages; + } @Override protected final void executeStage() { @@ -100,8 +107,10 @@ public abstract class AbstractCompositeStage extends Stage { super.setOwningThread(owningThread); } - protected static <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) { + protected <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) { INTRA_PIPE_FACTORY.create(out, in); + containingStages.add(out.getOwningStage()); + containingStages.add(in.getOwningStage()); } @Override @@ -116,6 +125,17 @@ public abstract class AbstractCompositeStage extends Stage { @Override public final void onStarting() throws Exception { + for (Stage stage : containingStages) { + if (stage.getOutputPorts().length == 0) { + lastStages.add(stage); + break; + } + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + if (!containingStages.contains(outputPort.getPipe().getTargetPort().getOwningStage())) { + lastStages.add(stage); + } + } + } getFirstStage().onStarting(); } diff --git a/src/main/java/teetime/framework/AbstractPort.java b/src/main/java/teetime/framework/AbstractPort.java index 80ec742bf8cc1e5764fe49e5cf97a13e7b01e90c..0725cbe873fce302ffdcb462df3c28d7506ebb69 100644 --- a/src/main/java/teetime/framework/AbstractPort.java +++ b/src/main/java/teetime/framework/AbstractPort.java @@ -27,10 +27,12 @@ public abstract class AbstractPort<T> { * </p> */ protected final Class<T> type; + private final Stage owningStage; - public AbstractPort(final Class<T> type) { + public AbstractPort(final Class<T> type, final Stage owningStage) { super(); this.type = type; + this.owningStage = owningStage; } public IPipe getPipe() { @@ -44,4 +46,8 @@ public abstract class AbstractPort<T> { public Class<T> getType() { return this.type; } + + public final Stage getOwningStage() { + return owningStage; + } } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 3c0086dc4edfa3a72ec1d9117462ba08de9e0120..f2621622998c198bd69ffd3efe52a4fb581fe999 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -167,7 +167,7 @@ public abstract class AbstractStage extends Stage { * @return Newly added OutputPort */ protected <T> OutputPort<T> createOutputPort(final Class<T> type) { - final OutputPort<T> outputPort = new OutputPort<T>(type); + final OutputPort<T> outputPort = new OutputPort<T>(type, this); outputPorts = addElementToArray(outputPort, outputPorts); return outputPort; } diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index 6c22a44d36c7284f68d6e3cea904ea614c4f48ac..2c2fee6f833fd6fc8476962a9ea3cf371845366b 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -17,11 +17,8 @@ package teetime.framework; public final class InputPort<T> extends AbstractPort<T> { - private final Stage owningStage; - InputPort(final Class<T> type, final Stage owningStage) { - super(type); - this.owningStage = owningStage; + super(type, owningStage); } /** @@ -33,10 +30,6 @@ public final class InputPort<T> extends AbstractPort<T> { return (T) this.pipe.removeLast(); } - public Stage getOwningStage() { - return this.owningStage; - } - public boolean isClosed() { return pipe.isClosed() && !pipe.hasMore(); } diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index c1385d4c1e105902bf3f7550d3fa5d84deb99795..4d23baaad7a2030dd3bbe361f8161b8bb63dca65 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -29,8 +29,8 @@ import teetime.framework.signal.TerminatingSignal; */ public final class OutputPort<T> extends AbstractPort<T> { - OutputPort(final Class<T> type) { - super(type); + OutputPort(final Class<T> type, final Stage owningStage) { + super(type, owningStage); } /** diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 72a05fd9e0483771c72e01e1239c199896a5da48..bc218cf3d83ff6b4beb40cd19e19158e6757c94c 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -16,7 +16,6 @@ package teetime.stage.io; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import teetime.framework.AbstractCompositeStage; @@ -57,9 +56,4 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { return distributor; } - @Override - protected Collection<? extends Stage> getLastStages() { - return lastStages; - } - } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 41813781c255accec627260c3d6fb66307665194..c2a8a409d0261e5c215897fc0cd8a3905f3a9c87 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -16,7 +16,6 @@ package teetime.stage.string; import java.util.ArrayList; -import java.util.Collection; import teetime.framework.AbstractCompositeStage; import teetime.framework.InputPort; @@ -56,11 +55,6 @@ public final class WordCounter extends AbstractCompositeStage { return this.tokenizer; } - @Override - protected Collection<? extends Stage> getLastStages() { - return this.lastStages; - } - public InputPort<String> getInputPort() { return this.tokenizer.getInputPort(); }