Skip to content
Snippets Groups Projects
Commit 6efde3a2 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

fixed bug in ACompositeStage

parent 66dac4aa
No related branches found
No related tags found
No related merge requests found
......@@ -39,19 +39,10 @@ public abstract class AbstractCompositeStage extends Stage {
private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE
.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
private final List<OutputPort<?>> outputPorts;
protected abstract Stage getFirstStage();
protected abstract Collection<? extends Stage> getLastStages();
public AbstractCompositeStage() {
outputPorts = new ArrayList<OutputPort<?>>();
for (final Stage s : getLastStages()) {
outputPorts.addAll(Arrays.asList(s.getOutputPorts()));
}
}
@Override
protected final void executeStage() {
getFirstStage().executeStage();
......@@ -84,6 +75,10 @@ public abstract class AbstractCompositeStage extends Stage {
@Override
protected OutputPort<?>[] getOutputPorts() {
List<OutputPort<?>> outputPorts = new ArrayList<OutputPort<?>>();
for (final Stage s : getLastStages()) {
outputPorts.addAll(Arrays.asList(s.getOutputPorts()));
}
return outputPorts.toArray(new OutputPort[0]);
}
......
......@@ -44,7 +44,6 @@ public final class WordCounter extends AbstractCompositeStage {
// The connection of the different stages is realized within the construction of a instance of this class.
public WordCounter() {
this.lastStages.add(this.mapCounter);
final ToLowerCase toLowerCase = new ToLowerCase();
connectStages(this.tokenizer.getOutputPort(), toLowerCase.getInputPort());
......
......@@ -34,7 +34,7 @@ public class TraversorTest {
public final InitialElementProducer<File> init;
public TestConfiguration() {
int threads = 4;
int threads = 1;
init = new InitialElementProducer<File>(new File(""));
// final File2Lines f2b = new File2Lines();
final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512);
......@@ -57,12 +57,11 @@ public class TraversorTest {
// final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>();
final WordCounter wc = new WordCounter();
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final WordCounter threadableStage = wc;
final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), threadableStage.getInputPort(), 10000);
final IPipe distributorPipe = interFact.create(distributor.getNewOutputPort(), wc.getInputPort(), 10000);
final IPipe mergerPipe = interFact.create(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(threadableStage);
addThreadableStage(wc);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment