From b527460c1d0995e129e3df9f683233020340aacf Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Tue, 17 Feb 2015 15:34:38 +0100 Subject: [PATCH] modified the MapMerger to be connected to a generic merger; renamed DistributedMapCounter to MappingCounter; added first draft of WordCounter --- .../teetime/framework/CompositeStage.java | 2 +- .../java/teetime/stage/CountingMapMerger.java | 11 ----- ...tedMapCounter.java => MappingCounter.java} | 8 +++- .../teetime/stage/string/ToLowerCase.java | 4 ++ .../teetime/stage/string/WordCounter.java | 48 +++++++++++++++++++ 5 files changed, 59 insertions(+), 14 deletions(-) rename src/main/java/teetime/stage/{DistributedMapCounter.java => MappingCounter.java} (89%) create mode 100644 src/main/java/teetime/stage/string/WordCounter.java diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 83469d0f..98e011b2 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -32,7 +32,7 @@ import teetime.framework.validation.InvalidPortConnection; @SuppressWarnings("PMD.AbstractNaming") public abstract class CompositeStage extends Stage { - private static PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; + protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; protected abstract Stage getFirstStage(); diff --git a/src/main/java/teetime/stage/CountingMapMerger.java b/src/main/java/teetime/stage/CountingMapMerger.java index 99225cfb..75a9daea 100644 --- a/src/main/java/teetime/stage/CountingMapMerger.java +++ b/src/main/java/teetime/stage/CountingMapMerger.java @@ -37,17 +37,6 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>> private final CountingMap<T> result = new CountingMap<T>(); private final OutputPort<Map<T, Integer>> port = createOutputPort(); - @SuppressWarnings("unused") - // May be needed to identify, if all stages before this one terminated - private final int numberOfInputPorts; - - public CountingMapMerger(final int numberOfInputPorts) { - for (int i = 1; i < numberOfInputPorts; i++) { - createInputPort(); - } - this.numberOfInputPorts = numberOfInputPorts; - } - @Override protected void execute(final CountingMap<T> element) { Set<Map.Entry<T, Integer>> entries = element.entrySet(); diff --git a/src/main/java/teetime/stage/DistributedMapCounter.java b/src/main/java/teetime/stage/MappingCounter.java similarity index 89% rename from src/main/java/teetime/stage/DistributedMapCounter.java rename to src/main/java/teetime/stage/MappingCounter.java index ebb3c0c3..370c6a1c 100644 --- a/src/main/java/teetime/stage/DistributedMapCounter.java +++ b/src/main/java/teetime/stage/MappingCounter.java @@ -28,12 +28,12 @@ import teetime.stage.util.CountingMap; * @param <T> * Type to be count */ -public class DistributedMapCounter<T> extends AbstractConsumerStage<T> { +public class MappingCounter<T> extends AbstractConsumerStage<T> { private final CountingMap<T> counter = new CountingMap<T>(); private final OutputPort<CountingMap<T>> port = createOutputPort(); - public DistributedMapCounter() { + public MappingCounter() { } @@ -49,4 +49,8 @@ public class DistributedMapCounter<T> extends AbstractConsumerStage<T> { super.onTerminating(); } + public OutputPort<CountingMap<T>> getOutputPort() { + return port; + } + } diff --git a/src/main/java/teetime/stage/string/ToLowerCase.java b/src/main/java/teetime/stage/string/ToLowerCase.java index 5f5bc840..5f97ad5e 100644 --- a/src/main/java/teetime/stage/string/ToLowerCase.java +++ b/src/main/java/teetime/stage/string/ToLowerCase.java @@ -34,4 +34,8 @@ public class ToLowerCase extends AbstractConsumerStage<String> { } + public OutputPort<String> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java new file mode 100644 index 00000000..09f40e0f --- /dev/null +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -0,0 +1,48 @@ +package teetime.stage.string; + +import java.util.ArrayList; +import java.util.Collection; + +import teetime.framework.CompositeStage; +import teetime.framework.Stage; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.MappingCounter; + +/** + * Intermediate stage, which receives texts and counts the occurring words. + * The result is passed on upon termination. + * + * @since 1.1 + * + * @author Nelson Tavares de Sousa + * + */ +public class WordCounter extends CompositeStage { + + private final Tokenizer tokenizer = new Tokenizer(" "); + private final MappingCounter<String> mapCounter = new MappingCounter<String>(); + private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); + + public WordCounter() { + lastStages.add(mapCounter); + + IPipeFactory pipeFact = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + ToLowerCase toLowerCase = new ToLowerCase(); + pipeFact.create(tokenizer.getOutputPort(), toLowerCase.getInputPort()); + pipeFact.create(toLowerCase.getOutputPort(), mapCounter.getInputPort()); + } + + @Override + protected Stage getFirstStage() { + return tokenizer; + } + + @Override + protected Collection<? extends Stage> getLastStages() { + return lastStages; + } + +} -- GitLab