From 6efde3a215cfd2d57e03d7eaaac69cc1ad3ad457 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Tue, 14 Apr 2015 11:04:05 +0200 Subject: [PATCH] fixed bug in ACompositeStage --- .../teetime/framework/AbstractCompositeStage.java | 13 ++++--------- src/main/java/teetime/stage/string/WordCounter.java | 1 - src/test/java/teetime/framework/TraversorTest.java | 7 +++---- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 8ddda87b..84ced579 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -39,19 +39,10 @@ public abstract class AbstractCompositeStage extends Stage { private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - private final List<OutputPort<?>> outputPorts; - protected abstract Stage getFirstStage(); protected abstract Collection<? extends Stage> getLastStages(); - public AbstractCompositeStage() { - outputPorts = new ArrayList<OutputPort<?>>(); - for (final Stage s : getLastStages()) { - outputPorts.addAll(Arrays.asList(s.getOutputPorts())); - } - } - @Override protected final void executeStage() { getFirstStage().executeStage(); @@ -84,6 +75,10 @@ public abstract class AbstractCompositeStage extends Stage { @Override protected OutputPort<?>[] getOutputPorts() { + List<OutputPort<?>> outputPorts = new ArrayList<OutputPort<?>>(); + for (final Stage s : getLastStages()) { + outputPorts.addAll(Arrays.asList(s.getOutputPorts())); + } return outputPorts.toArray(new OutputPort[0]); } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 2c88aa6e..41813781 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -44,7 +44,6 @@ public final class WordCounter extends AbstractCompositeStage { // The connection of the different stages is realized within the construction of a instance of this class. public WordCounter() { this.lastStages.add(this.mapCounter); - final ToLowerCase toLowerCase = new ToLowerCase(); connectStages(this.tokenizer.getOutputPort(), toLowerCase.getInputPort()); diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 390b2007..19ac88ca 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -34,7 +34,7 @@ public class TraversorTest { public final InitialElementProducer<File> init; public TestConfiguration() { - int threads = 4; + int threads = 1; init = new InitialElementProducer<File>(new File("")); // final File2Lines f2b = new File2Lines(); final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512); @@ -57,12 +57,11 @@ public class TraversorTest { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - final WordCounter threadableStage = wc; - final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), threadableStage.getInputPort(), 10000); + final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000); final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(threadableStage); + addThreadableStage(wc); } -- GitLab