diff --git a/src/main/java/teetime/stage/CountingMapMerger.java b/src/main/java/teetime/stage/CountingMapMerger.java index 75a9daea35dcee12c6c6acb91d32c38f25aed29e..0b4ec5f1e84ccd6a7a4f0410e24ab02133848365 100644 --- a/src/main/java/teetime/stage/CountingMapMerger.java +++ b/src/main/java/teetime/stage/CountingMapMerger.java @@ -47,6 +47,7 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>> @Override public void onTerminating() throws Exception { + System.out.println("TERMINATE"); port.send(result); super.onTerminating(); } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 09f40e0f64969fdc53bf54055ed04cf5976a20bf..271e0d552a87eb0d9e6b3973afdb14b01b967035 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -4,12 +4,15 @@ import java.util.ArrayList; import java.util.Collection; import teetime.framework.CompositeStage; +import teetime.framework.InputPort; +import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.MappingCounter; +import teetime.stage.util.CountingMap; /** * Intermediate stage, which receives texts and counts the occurring words. @@ -45,4 +48,12 @@ public class WordCounter extends CompositeStage { return lastStages; } + public InputPort<String> getInputPort() { + return tokenizer.getInputPort(); + } + + public OutputPort<CountingMap<String>> getOutputPort() { + return mapCounter.getOutputPort(); + } + } diff --git a/src/test/java/teetime/stage/WordCountingConfiguration.java b/src/test/java/teetime/stage/WordCountingConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..f238ea296890c68bdbcbd24504b49b38d3528ecc --- /dev/null +++ b/src/test/java/teetime/stage/WordCountingConfiguration.java @@ -0,0 +1,48 @@ +package teetime.stage; + +import java.io.File; + +import teetime.framework.AnalysisConfiguration; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.basic.distributor.Distributor; +import teetime.stage.basic.merger.Merger; +import teetime.stage.io.File2ByteArray; +import teetime.stage.string.WordCounter; +import teetime.stage.util.CountingMap; + +public class WordCountingConfiguration extends AnalysisConfiguration { + + private final CountingMapMerger<String> result = new CountingMapMerger<String>(); + + public WordCountingConfiguration(final File input/* TODO: scale to i threads */) { + final InitialElementProducer<File> init = new InitialElementProducer<File>(input); + final File2ByteArray f2b = new File2ByteArray(); + final ByteArray2String b2s = new ByteArray2String(); + final Distributor<String> dist = new Distributor<String>(); + + final WordCounter wc = new WordCounter(); + + final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); + // result + IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + + interFact.create(init.getOutputPort(), f2b.getInputPort()); + interFact.create(f2b.getOutputPort(), b2s.getInputPort()); + interFact.create(b2s.getOutputPort(), dist.getInputPort()); + + // scale + intraFact.create(dist.getNewOutputPort(), wc.getInputPort()); + intraFact.create(wc.getOutputPort(), merger.getNewInputPort()); + + interFact.create(merger.getOutputPort(), result.getInputPort()); + + } + + public CountingMap<String> getResult() { + return result.getResult(); + } + +} diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4e584d3e08ea6021c6bf5ccb043ea3ddc5c39bfa --- /dev/null +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -0,0 +1,18 @@ +package teetime.stage; + +import java.io.File; + +import org.junit.Test; + +import teetime.framework.Analysis; + +public class WordCountingTest { + + @Test + public void test1() { + WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt")); + Analysis analysis = new Analysis(wcc); + analysis.start(); + System.out.println(wcc.getResult().size()); + } +}