diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 8ddda87b70fbe9e0f6131ddb4ded8cadc57535fe..84ced579f485f65e7ae0e20cf783001dd7f64bd7 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -39,19 +39,10 @@ public abstract class AbstractCompositeStage extends Stage { private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - private final List<OutputPort<?>> outputPorts; - protected abstract Stage getFirstStage(); protected abstract Collection<? extends Stage> getLastStages(); - public AbstractCompositeStage() { - outputPorts = new ArrayList<OutputPort<?>>(); - for (final Stage s : getLastStages()) { - outputPorts.addAll(Arrays.asList(s.getOutputPorts())); - } - } - @Override protected final void executeStage() { getFirstStage().executeStage(); @@ -84,6 +75,10 @@ public abstract class AbstractCompositeStage extends Stage { @Override protected OutputPort<?>[] getOutputPorts() { + List<OutputPort<?>> outputPorts = new ArrayList<OutputPort<?>>(); + for (final Stage s : getLastStages()) { + outputPorts.addAll(Arrays.asList(s.getOutputPorts())); + } return outputPorts.toArray(new OutputPort[0]); } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 2c88aa6ebcb4373a2d0cf02765075a74188f94d0..41813781c255accec627260c3d6fb66307665194 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -44,7 +44,6 @@ public final class WordCounter extends AbstractCompositeStage { // The connection of the different stages is realized within the construction of a instance of this class. public WordCounter() { this.lastStages.add(this.mapCounter); - final ToLowerCase toLowerCase = new ToLowerCase(); connectStages(this.tokenizer.getOutputPort(), toLowerCase.getInputPort()); diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 390b20073b23393f988d648d20ea6929a5f1185c..19ac88ca121365481f6ac1cb4b181da9f2f01178 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -34,7 +34,7 @@ public class TraversorTest { public final InitialElementProducer<File> init; public TestConfiguration() { - int threads = 4; + int threads = 1; init = new InitialElementProducer<File>(new File("")); // final File2Lines f2b = new File2Lines(); final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512); @@ -57,12 +57,11 @@ public class TraversorTest { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - final WordCounter threadableStage = wc; - final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), threadableStage.getInputPort(), 10000); + final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000); final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(threadableStage); + addThreadableStage(wc); }