Skip to content
Snippets Groups Projects
Commit fbd1853b authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

refactored getLastStages

parent c02b06fc
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,9 @@ package teetime.framework;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
......@@ -39,9 +41,14 @@ public abstract class AbstractCompositeStage extends Stage {
private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE
.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
private final Set<Stage> containingStages = new HashSet<Stage>();
private final Set<Stage> lastStages = new HashSet<Stage>();
protected abstract Stage getFirstStage();
protected abstract Collection<? extends Stage> getLastStages();
protected final Collection<? extends Stage> getLastStages() {
return lastStages;
}
@Override
protected final void executeStage() {
......@@ -100,8 +107,10 @@ public abstract class AbstractCompositeStage extends Stage {
super.setOwningThread(owningThread);
}
protected static <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) {
protected <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) {
INTRA_PIPE_FACTORY.create(out, in);
containingStages.add(out.getOwningStage());
containingStages.add(in.getOwningStage());
}
@Override
......@@ -116,6 +125,17 @@ public abstract class AbstractCompositeStage extends Stage {
@Override
public final void onStarting() throws Exception {
for (Stage stage : containingStages) {
if (stage.getOutputPorts().length == 0) {
lastStages.add(stage);
break;
}
for (OutputPort<?> outputPort : stage.getOutputPorts()) {
if (!containingStages.contains(outputPort.getPipe().getTargetPort().getOwningStage())) {
lastStages.add(stage);
}
}
}
getFirstStage().onStarting();
}
......
......@@ -27,10 +27,12 @@ public abstract class AbstractPort<T> {
* </p>
*/
protected final Class<T> type;
private final Stage owningStage;
public AbstractPort(final Class<T> type) {
public AbstractPort(final Class<T> type, final Stage owningStage) {
super();
this.type = type;
this.owningStage = owningStage;
}
public IPipe getPipe() {
......@@ -44,4 +46,8 @@ public abstract class AbstractPort<T> {
public Class<T> getType() {
return this.type;
}
public final Stage getOwningStage() {
return owningStage;
}
}
......@@ -167,7 +167,7 @@ public abstract class AbstractStage extends Stage {
* @return Newly added OutputPort
*/
protected <T> OutputPort<T> createOutputPort(final Class<T> type) {
final OutputPort<T> outputPort = new OutputPort<T>(type);
final OutputPort<T> outputPort = new OutputPort<T>(type, this);
outputPorts = addElementToArray(outputPort, outputPorts);
return outputPort;
}
......
......@@ -17,11 +17,8 @@ package teetime.framework;
public final class InputPort<T> extends AbstractPort<T> {
private final Stage owningStage;
InputPort(final Class<T> type, final Stage owningStage) {
super(type);
this.owningStage = owningStage;
super(type, owningStage);
}
/**
......@@ -33,10 +30,6 @@ public final class InputPort<T> extends AbstractPort<T> {
return (T) this.pipe.removeLast();
}
public Stage getOwningStage() {
return this.owningStage;
}
public boolean isClosed() {
return pipe.isClosed() && !pipe.hasMore();
}
......
......@@ -29,8 +29,8 @@ import teetime.framework.signal.TerminatingSignal;
*/
public final class OutputPort<T> extends AbstractPort<T> {
OutputPort(final Class<T> type) {
super(type);
OutputPort(final Class<T> type, final Stage owningStage) {
super(type, owningStage);
}
/**
......
......@@ -16,7 +16,6 @@
package teetime.stage.io;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import teetime.framework.AbstractCompositeStage;
......@@ -57,9 +56,4 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
return distributor;
}
@Override
protected Collection<? extends Stage> getLastStages() {
return lastStages;
}
}
......@@ -16,7 +16,6 @@
package teetime.stage.string;
import java.util.ArrayList;
import java.util.Collection;
import teetime.framework.AbstractCompositeStage;
import teetime.framework.InputPort;
......@@ -56,11 +55,6 @@ public final class WordCounter extends AbstractCompositeStage {
return this.tokenizer;
}
@Override
protected Collection<? extends Stage> getLastStages() {
return this.lastStages;
}
public InputPort<String> getInputPort() {
return this.tokenizer.getInputPort();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment