diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 83469d0f89333145506da5fee87822a72d871b8d..98e011b245b1dc5e484e7d89fcc2919299d7aea9 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -32,7 +32,7 @@ import teetime.framework.validation.InvalidPortConnection; @SuppressWarnings("PMD.AbstractNaming") public abstract class CompositeStage extends Stage { - private static PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; + protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; protected abstract Stage getFirstStage(); diff --git a/src/main/java/teetime/stage/CountingMapMerger.java b/src/main/java/teetime/stage/CountingMapMerger.java index 99225cfb92bb9e0e21135371b053575af7411a1e..75a9daea35dcee12c6c6acb91d32c38f25aed29e 100644 --- a/src/main/java/teetime/stage/CountingMapMerger.java +++ b/src/main/java/teetime/stage/CountingMapMerger.java @@ -37,17 +37,6 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>> private final CountingMap<T> result = new CountingMap<T>(); private final OutputPort<Map<T, Integer>> port = createOutputPort(); - @SuppressWarnings("unused") - // May be needed to identify, if all stages before this one terminated - private final int numberOfInputPorts; - - public CountingMapMerger(final int numberOfInputPorts) { - for (int i = 1; i < numberOfInputPorts; i++) { - createInputPort(); - } - this.numberOfInputPorts = numberOfInputPorts; - } - @Override protected void execute(final CountingMap<T> element) { Set<Map.Entry<T, Integer>> entries = element.entrySet(); diff --git a/src/main/java/teetime/stage/DistributedMapCounter.java b/src/main/java/teetime/stage/MappingCounter.java similarity index 89% rename from src/main/java/teetime/stage/DistributedMapCounter.java rename to src/main/java/teetime/stage/MappingCounter.java index ebb3c0c3a0186512f01fe7f686da4821fed9bd64..370c6a1cb06508cd2eeb9b7663edb2030653b937 100644 --- a/src/main/java/teetime/stage/DistributedMapCounter.java +++ b/src/main/java/teetime/stage/MappingCounter.java @@ -28,12 +28,12 @@ import teetime.stage.util.CountingMap; * @param <T> * Type to be count */ -public class DistributedMapCounter<T> extends AbstractConsumerStage<T> { +public class MappingCounter<T> extends AbstractConsumerStage<T> { private final CountingMap<T> counter = new CountingMap<T>(); private final OutputPort<CountingMap<T>> port = createOutputPort(); - public DistributedMapCounter() { + public MappingCounter() { } @@ -49,4 +49,8 @@ public class DistributedMapCounter<T> extends AbstractConsumerStage<T> { super.onTerminating(); } + public OutputPort<CountingMap<T>> getOutputPort() { + return port; + } + } diff --git a/src/main/java/teetime/stage/string/ToLowerCase.java b/src/main/java/teetime/stage/string/ToLowerCase.java index 5f5bc840fe6f9a5692c401c376c23d4cf212bbb1..5f97ad5e09d2bec7945c7cd1766806db55ee989b 100644 --- a/src/main/java/teetime/stage/string/ToLowerCase.java +++ b/src/main/java/teetime/stage/string/ToLowerCase.java @@ -34,4 +34,8 @@ public class ToLowerCase extends AbstractConsumerStage<String> { } + public OutputPort<String> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java new file mode 100644 index 0000000000000000000000000000000000000000..09f40e0f64969fdc53bf54055ed04cf5976a20bf --- /dev/null +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -0,0 +1,48 @@ +package teetime.stage.string; + +import java.util.ArrayList; +import java.util.Collection; + +import teetime.framework.CompositeStage; +import teetime.framework.Stage; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.MappingCounter; + +/** + * Intermediate stage, which receives texts and counts the occurring words. + * The result is passed on upon termination. + * + * @since 1.1 + * + * @author Nelson Tavares de Sousa + * + */ +public class WordCounter extends CompositeStage { + + private final Tokenizer tokenizer = new Tokenizer(" "); + private final MappingCounter<String> mapCounter = new MappingCounter<String>(); + private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); + + public WordCounter() { + lastStages.add(mapCounter); + + IPipeFactory pipeFact = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + ToLowerCase toLowerCase = new ToLowerCase(); + pipeFact.create(tokenizer.getOutputPort(), toLowerCase.getInputPort()); + pipeFact.create(toLowerCase.getOutputPort(), mapCounter.getInputPort()); + } + + @Override + protected Stage getFirstStage() { + return tokenizer; + } + + @Override + protected Collection<? extends Stage> getLastStages() { + return lastStages; + } + +}