Skip to content
Snippets Groups Projects
Commit 73649526 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

test prototype, not working yet, no scaling

parent 3d9e4568
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,7 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>>
@Override
public void onTerminating() throws Exception {
System.out.println("TERMINATE");
port.send(result);
super.onTerminating();
}
......
......@@ -4,12 +4,15 @@ import java.util.ArrayList;
import java.util.Collection;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.MappingCounter;
import teetime.stage.util.CountingMap;
/**
* Intermediate stage, which receives texts and counts the occurring words.
......@@ -45,4 +48,12 @@ public class WordCounter extends CompositeStage {
return lastStages;
}
public InputPort<String> getInputPort() {
return tokenizer.getInputPort();
}
public OutputPort<CountingMap<String>> getOutputPort() {
return mapCounter.getOutputPort();
}
}
package teetime.stage;
import java.io.File;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.merger.Merger;
import teetime.stage.io.File2ByteArray;
import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap;
public class WordCountingConfiguration extends AnalysisConfiguration {
private final CountingMapMerger<String> result = new CountingMapMerger<String>();
public WordCountingConfiguration(final File input/* TODO: scale to i threads */) {
final InitialElementProducer<File> init = new InitialElementProducer<File>(input);
final File2ByteArray f2b = new File2ByteArray();
final ByteArray2String b2s = new ByteArray2String();
final Distributor<String> dist = new Distributor<String>();
final WordCounter wc = new WordCounter();
final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
// result
IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interFact.create(init.getOutputPort(), f2b.getInputPort());
interFact.create(f2b.getOutputPort(), b2s.getInputPort());
interFact.create(b2s.getOutputPort(), dist.getInputPort());
// scale
intraFact.create(dist.getNewOutputPort(), wc.getInputPort());
intraFact.create(wc.getOutputPort(), merger.getNewInputPort());
interFact.create(merger.getOutputPort(), result.getInputPort());
}
public CountingMap<String> getResult() {
return result.getResult();
}
}
package teetime.stage;
import java.io.File;
import org.junit.Test;
import teetime.framework.Analysis;
public class WordCountingTest {
@Test
public void test1() {
WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt"));
Analysis analysis = new Analysis(wcc);
analysis.start();
System.out.println(wcc.getResult().size());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment