Skip to content
Snippets Groups Projects
Commit b527460c authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

modified the MapMerger to be connected to a generic merger;

renamed DistributedMapCounter to MappingCounter;
added first draft of WordCounter
parent 758d3486
No related branches found
No related tags found
No related merge requests found
......@@ -32,7 +32,7 @@ import teetime.framework.validation.InvalidPortConnection;
@SuppressWarnings("PMD.AbstractNaming")
public abstract class CompositeStage extends Stage {
private static PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
protected abstract Stage getFirstStage();
......
......@@ -37,17 +37,6 @@ public class CountingMapMerger<T> extends AbstractConsumerStage<CountingMap<T>>
private final CountingMap<T> result = new CountingMap<T>();
private final OutputPort<Map<T, Integer>> port = createOutputPort();
@SuppressWarnings("unused")
// May be needed to identify, if all stages before this one terminated
private final int numberOfInputPorts;
public CountingMapMerger(final int numberOfInputPorts) {
for (int i = 1; i < numberOfInputPorts; i++) {
createInputPort();
}
this.numberOfInputPorts = numberOfInputPorts;
}
@Override
protected void execute(final CountingMap<T> element) {
Set<Map.Entry<T, Integer>> entries = element.entrySet();
......
......@@ -28,12 +28,12 @@ import teetime.stage.util.CountingMap;
* @param <T>
* Type to be count
*/
public class DistributedMapCounter<T> extends AbstractConsumerStage<T> {
public class MappingCounter<T> extends AbstractConsumerStage<T> {
private final CountingMap<T> counter = new CountingMap<T>();
private final OutputPort<CountingMap<T>> port = createOutputPort();
public DistributedMapCounter() {
public MappingCounter() {
}
......@@ -49,4 +49,8 @@ public class DistributedMapCounter<T> extends AbstractConsumerStage<T> {
super.onTerminating();
}
public OutputPort<CountingMap<T>> getOutputPort() {
return port;
}
}
......@@ -34,4 +34,8 @@ public class ToLowerCase extends AbstractConsumerStage<String> {
}
public OutputPort<String> getOutputPort() {
return outputPort;
}
}
package teetime.stage.string;
import java.util.ArrayList;
import java.util.Collection;
import teetime.framework.CompositeStage;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.MappingCounter;
/**
* Intermediate stage, which receives texts and counts the occurring words.
* The result is passed on upon termination.
*
* @since 1.1
*
* @author Nelson Tavares de Sousa
*
*/
public class WordCounter extends CompositeStage {
private final Tokenizer tokenizer = new Tokenizer(" ");
private final MappingCounter<String> mapCounter = new MappingCounter<String>();
private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
public WordCounter() {
lastStages.add(mapCounter);
IPipeFactory pipeFact = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
ToLowerCase toLowerCase = new ToLowerCase();
pipeFact.create(tokenizer.getOutputPort(), toLowerCase.getInputPort());
pipeFact.create(toLowerCase.getOutputPort(), mapCounter.getInputPort());
}
@Override
protected Stage getFirstStage() {
return tokenizer;
}
@Override
protected Collection<? extends Stage> getLastStages() {
return lastStages;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment