From 73649526be8eaef3fdec098f015e48ee5c0a1bbd Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Tue, 17 Feb 2015 16:08:37 +0100 Subject: [PATCH] test prototype, not working yet, no scaling --- .../java/teetime/stage/CountingMapMerger.java | 1 + .../teetime/stage/string/WordCounter.java | 11 +++++ .../stage/WordCountingConfiguration.java | 48 +++++++++++++++++++ .../java/teetime/stage/WordCountingTest.java | 18 +++++++ 4 files changed, 78 insertions(+) create mode 100644 src/test/java/teetime/stage/WordCountingConfiguration.java create mode 100644 src/test/java/teetime/stage/WordCountingTest.java diff --git a/src/main/java/teetime/stage/CountingMapMerger.java b/src/main/java/teetime/stage/CountingMapMerger.java index 75a9daea..0b4ec5f1 100644 --- a/src/main/java/teetime/stage/CountingMapMerger.java +++ b/src/main/java/teetime/stage/CountingMapMerger.java @@ -47,6 +47,7 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>> @Override public void onTerminating() throws Exception { + System.out.println("TERMINATE"); port.send(result); super.onTerminating(); } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 09f40e0f..271e0d55 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -4,12 +4,15 @@ import java.util.ArrayList; import java.util.Collection; import teetime.framework.CompositeStage; +import teetime.framework.InputPort; +import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.MappingCounter; +import teetime.stage.util.CountingMap; /** * Intermediate stage, which receives texts and counts the occurring words. @@ -45,4 +48,12 @@ public class WordCounter extends CompositeStage { return lastStages; } + public InputPort<String> getInputPort() { + return tokenizer.getInputPort(); + } + + public OutputPort<CountingMap<String>> getOutputPort() { + return mapCounter.getOutputPort(); + } + } diff --git a/src/test/java/teetime/stage/WordCountingConfiguration.java b/src/test/java/teetime/stage/WordCountingConfiguration.java new file mode 100644 index 00000000..f238ea29 --- /dev/null +++ b/src/test/java/teetime/stage/WordCountingConfiguration.java @@ -0,0 +1,48 @@ +package teetime.stage; + +import java.io.File; + +import teetime.framework.AnalysisConfiguration; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.basic.distributor.Distributor; +import teetime.stage.basic.merger.Merger; +import teetime.stage.io.File2ByteArray; +import teetime.stage.string.WordCounter; +import teetime.stage.util.CountingMap; + +public class WordCountingConfiguration extends AnalysisConfiguration { + + private final CountingMapMerger<String> result = new CountingMapMerger<String>(); + + public WordCountingConfiguration(final File input/* TODO: scale to i threads */) { + final InitialElementProducer<File> init = new InitialElementProducer<File>(input); + final File2ByteArray f2b = new File2ByteArray(); + final ByteArray2String b2s = new ByteArray2String(); + final Distributor<String> dist = new Distributor<String>(); + + final WordCounter wc = new WordCounter(); + + final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); + // result + IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + + interFact.create(init.getOutputPort(), f2b.getInputPort()); + interFact.create(f2b.getOutputPort(), b2s.getInputPort()); + interFact.create(b2s.getOutputPort(), dist.getInputPort()); + + // scale + intraFact.create(dist.getNewOutputPort(), wc.getInputPort()); + intraFact.create(wc.getOutputPort(), merger.getNewInputPort()); + + interFact.create(merger.getOutputPort(), result.getInputPort()); + + } + + public CountingMap<String> getResult() { + return result.getResult(); + } + +} diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java new file mode 100644 index 00000000..4e584d3e --- /dev/null +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -0,0 +1,18 @@ +package teetime.stage; + +import java.io.File; + +import org.junit.Test; + +import teetime.framework.Analysis; + +public class WordCountingTest { + + @Test + public void test1() { + WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt")); + Analysis analysis = new Analysis(wcc); + analysis.start(); + System.out.println(wcc.getResult().size()); + } +} -- GitLab