From fbd1853b38001c5806fc0b37054e365e78330c73 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Mon, 20 Apr 2015 14:47:53 +0200 Subject: [PATCH] refactored getLastStages --- .../framework/AbstractCompositeStage.java | 24 +++++++++++++++++-- .../java/teetime/framework/AbstractPort.java | 8 ++++++- .../java/teetime/framework/AbstractStage.java | 2 +- .../java/teetime/framework/InputPort.java | 9 +------ .../java/teetime/framework/OutputPort.java | 4 ++-- .../teetime/stage/io/EveryXthPrinter.java | 6 ----- .../teetime/stage/string/WordCounter.java | 6 ----- 7 files changed, 33 insertions(+), 26 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 84ced579..44b10e65 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -18,7 +18,9 @@ package teetime.framework; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; @@ -39,9 +41,14 @@ public abstract class AbstractCompositeStage extends Stage { private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + private final Set<Stage> containingStages = new HashSet<Stage>(); + private final Set<Stage> lastStages = new HashSet<Stage>(); + protected abstract Stage getFirstStage(); - protected abstract Collection<? extends Stage> getLastStages(); + protected final Collection<? extends Stage> getLastStages() { + return lastStages; + } @Override protected final void executeStage() { @@ -100,8 +107,10 @@ public abstract class AbstractCompositeStage extends Stage { super.setOwningThread(owningThread); } - protected static <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) { + protected <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) { INTRA_PIPE_FACTORY.create(out, in); + containingStages.add(out.getOwningStage()); + containingStages.add(in.getOwningStage()); } @Override @@ -116,6 +125,17 @@ public abstract class AbstractCompositeStage extends Stage { @Override public final void onStarting() throws Exception { + for (Stage stage : containingStages) { + if (stage.getOutputPorts().length == 0) { + lastStages.add(stage); + break; + } + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + if (!containingStages.contains(outputPort.getPipe().getTargetPort().getOwningStage())) { + lastStages.add(stage); + } + } + } getFirstStage().onStarting(); } diff --git a/src/main/java/teetime/framework/AbstractPort.java b/src/main/java/teetime/framework/AbstractPort.java index 80ec742b..0725cbe8 100644 --- a/src/main/java/teetime/framework/AbstractPort.java +++ b/src/main/java/teetime/framework/AbstractPort.java @@ -27,10 +27,12 @@ public abstract class AbstractPort<T> { * </p> */ protected final Class<T> type; + private final Stage owningStage; - public AbstractPort(final Class<T> type) { + public AbstractPort(final Class<T> type, final Stage owningStage) { super(); this.type = type; + this.owningStage = owningStage; } public IPipe getPipe() { @@ -44,4 +46,8 @@ public abstract class AbstractPort<T> { public Class<T> getType() { return this.type; } + + public final Stage getOwningStage() { + return owningStage; + } } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 3c0086dc..f2621622 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -167,7 +167,7 @@ public abstract class AbstractStage extends Stage { * @return Newly added OutputPort */ protected <T> OutputPort<T> createOutputPort(final Class<T> type) { - final OutputPort<T> outputPort = new OutputPort<T>(type); + final OutputPort<T> outputPort = new OutputPort<T>(type, this); outputPorts = addElementToArray(outputPort, outputPorts); return outputPort; } diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index 6c22a44d..2c2fee6f 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -17,11 +17,8 @@ package teetime.framework; public final class InputPort<T> extends AbstractPort<T> { - private final Stage owningStage; - InputPort(final Class<T> type, final Stage owningStage) { - super(type); - this.owningStage = owningStage; + super(type, owningStage); } /** @@ -33,10 +30,6 @@ public final class InputPort<T> extends AbstractPort<T> { return (T) this.pipe.removeLast(); } - public Stage getOwningStage() { - return this.owningStage; - } - public boolean isClosed() { return pipe.isClosed() && !pipe.hasMore(); } diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index c1385d4c..4d23baaa 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -29,8 +29,8 @@ import teetime.framework.signal.TerminatingSignal; */ public final class OutputPort<T> extends AbstractPort<T> { - OutputPort(final Class<T> type) { - super(type); + OutputPort(final Class<T> type, final Stage owningStage) { + super(type, owningStage); } /** diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 72a05fd9..bc218cf3 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -16,7 +16,6 @@ package teetime.stage.io; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import teetime.framework.AbstractCompositeStage; @@ -57,9 +56,4 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { return distributor; } - @Override - protected Collection<? extends Stage> getLastStages() { - return lastStages; - } - } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 41813781..c2a8a409 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -16,7 +16,6 @@ package teetime.stage.string; import java.util.ArrayList; -import java.util.Collection; import teetime.framework.AbstractCompositeStage; import teetime.framework.InputPort; @@ -56,11 +55,6 @@ public final class WordCounter extends AbstractCompositeStage { return this.tokenizer; } - @Override - protected Collection<? extends Stage> getLastStages() { - return this.lastStages; - } - public InputPort<String> getInputPort() { return this.tokenizer.getInputPort(); } -- GitLab