Skip to content
Snippets Groups Projects
Commit a9bed829 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

The ownningThread of the first stage within a CompositeStage, will be

set to the CompositeStage's thread;
first scaling test prototype
parent 7016e6c7
No related branches found
No related tags found
No related merge requests found
...@@ -84,4 +84,10 @@ public abstract class CompositeStage extends Stage { ...@@ -84,4 +84,10 @@ public abstract class CompositeStage extends Stage {
return isStarted; return isStarted;
} }
@Override
void setOwningThread(final Thread owningThread) {
getFirstStage().setOwningThread(owningThread);
super.setOwningThread(owningThread);
}
} }
...@@ -27,6 +27,8 @@ import teetime.stage.util.CountingMap; ...@@ -27,6 +27,8 @@ import teetime.stage.util.CountingMap;
* Receives different CountingMap instances and merges them into a single one. * Receives different CountingMap instances and merges them into a single one.
* The result is sent upon termination. * The result is sent upon termination.
* *
* @since 1.1
*
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
* @param <T> * @param <T>
......
...@@ -23,6 +23,8 @@ import teetime.stage.util.CountingMap; ...@@ -23,6 +23,8 @@ import teetime.stage.util.CountingMap;
* This counts how many of different elements are sent to this stage. Nothing is forwarded. * This counts how many of different elements are sent to this stage. Nothing is forwarded.
* On termination a CountingMap is sent to its outputport. * On termination a CountingMap is sent to its outputport.
* *
* @since 1.1
*
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
* @param <T> * @param <T>
......
...@@ -22,6 +22,8 @@ import teetime.framework.OutputPort; ...@@ -22,6 +22,8 @@ import teetime.framework.OutputPort;
* Receives a string and passes it on to the next stage only with lower case letters. * Receives a string and passes it on to the next stage only with lower case letters.
* Punctuation and similar characters will be removed. Only [a-zA-Z ] will be passed on. * Punctuation and similar characters will be removed. Only [a-zA-Z ] will be passed on.
* *
* @since 1.1
*
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
*/ */
......
...@@ -21,6 +21,8 @@ import java.util.HashMap; ...@@ -21,6 +21,8 @@ import java.util.HashMap;
* An implementation of HashMap which can be used to count the occurrence of different keys. * An implementation of HashMap which can be used to count the occurrence of different keys.
* This conaitns all methods of HashMap, but is enhanched with the {@link #add(T, Integer)} and {@link #increment(T)} methods. * This conaitns all methods of HashMap, but is enhanched with the {@link #add(T, Integer)} and {@link #increment(T)} methods.
* *
* @since 1.1
*
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
* @param <T> * @param <T>
......
...@@ -16,31 +16,34 @@ public class WordCountingConfiguration extends AnalysisConfiguration { ...@@ -16,31 +16,34 @@ public class WordCountingConfiguration extends AnalysisConfiguration {
private final CountingMapMerger<String> result = new CountingMapMerger<String>(); private final CountingMapMerger<String> result = new CountingMapMerger<String>();
public WordCountingConfiguration(final File input/* TODO: scale to i threads */) { public WordCountingConfiguration(final File input, final int threads) {
final InitialElementProducer<File> init = new InitialElementProducer<File>(input); final InitialElementProducer<File> init = new InitialElementProducer<File>(input);
final File2ByteArray f2b = new File2ByteArray(); final File2ByteArray f2b = new File2ByteArray();
final ByteArray2String b2s = new ByteArray2String(); final ByteArray2String b2s = new ByteArray2String();
final Distributor<String> dist = new Distributor<String>(); final Distributor<String> dist = new Distributor<String>();
final WordCounter wc = new WordCounter();
final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
// result // result
IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interFact.create(init.getOutputPort(), f2b.getInputPort()); intraFact.create(init.getOutputPort(), f2b.getInputPort());
interFact.create(f2b.getOutputPort(), b2s.getInputPort()); intraFact.create(f2b.getOutputPort(), b2s.getInputPort());
interFact.create(b2s.getOutputPort(), dist.getInputPort()); intraFact.create(b2s.getOutputPort(), dist.getInputPort());
// scale // scale
intraFact.create(dist.getNewOutputPort(), wc.getInputPort()); WordCounter wc;
intraFact.create(wc.getOutputPort(), merger.getNewInputPort()); for (int i = 0; i < threads; i++) {
wc = new WordCounter();
interFact.create(dist.getNewOutputPort(), wc.getInputPort());
interFact.create(wc.getOutputPort(), merger.getNewInputPort());
addThreadableStage(wc);
}
interFact.create(merger.getOutputPort(), result.getInputPort()); intraFact.create(merger.getOutputPort(), result.getInputPort());
addThreadableStage(init); addThreadableStage(init);
addThreadableStage(wc);
addThreadableStage(merger); addThreadableStage(merger);
} }
......
...@@ -12,7 +12,7 @@ public class WordCountingTest { ...@@ -12,7 +12,7 @@ public class WordCountingTest {
@Test @Test
public void test1() { public void test1() {
WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt")); WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt"), 2);
Analysis analysis = new Analysis(wcc); Analysis analysis = new Analysis(wcc);
analysis.start(); analysis.start();
CountingMap<String> map = wcc.getResult(); CountingMap<String> map = wcc.getResult();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment