From a9bed829b4d80b6a019b14aa533a9d05e4b60f97 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Tue, 17 Feb 2015 16:50:18 +0100 Subject: [PATCH] The ownningThread of the first stage within a CompositeStage, will be set to the CompositeStage's thread; first scaling test prototype --- .../teetime/framework/CompositeStage.java | 6 +++++ .../java/teetime/stage/CountingMapMerger.java | 2 ++ .../java/teetime/stage/MappingCounter.java | 2 ++ .../teetime/stage/string/ToLowerCase.java | 2 ++ .../java/teetime/stage/util/CountingMap.java | 2 ++ .../stage/WordCountingConfiguration.java | 27 ++++++++++--------- .../java/teetime/stage/WordCountingTest.java | 2 +- 7 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 98e011b2..7beee244 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -84,4 +84,10 @@ public abstract class CompositeStage extends Stage { return isStarted; } + @Override + void setOwningThread(final Thread owningThread) { + getFirstStage().setOwningThread(owningThread); + super.setOwningThread(owningThread); + } + } diff --git a/src/main/java/teetime/stage/CountingMapMerger.java b/src/main/java/teetime/stage/CountingMapMerger.java index 75a9daea..1ca07fb2 100644 --- a/src/main/java/teetime/stage/CountingMapMerger.java +++ b/src/main/java/teetime/stage/CountingMapMerger.java @@ -27,6 +27,8 @@ import teetime.stage.util.CountingMap; * Receives different CountingMap instances and merges them into a single one. * The result is sent upon termination. * + * @since 1.1 + * * @author Nelson Tavares de Sousa * * @param <T> diff --git a/src/main/java/teetime/stage/MappingCounter.java b/src/main/java/teetime/stage/MappingCounter.java index 370c6a1c..5f433696 100644 --- a/src/main/java/teetime/stage/MappingCounter.java +++ b/src/main/java/teetime/stage/MappingCounter.java @@ -23,6 +23,8 @@ import teetime.stage.util.CountingMap; * This counts how many of different elements are sent to this stage. Nothing is forwarded. * On termination a CountingMap is sent to its outputport. * + * @since 1.1 + * * @author Nelson Tavares de Sousa * * @param <T> diff --git a/src/main/java/teetime/stage/string/ToLowerCase.java b/src/main/java/teetime/stage/string/ToLowerCase.java index b3be8949..9841dbc1 100644 --- a/src/main/java/teetime/stage/string/ToLowerCase.java +++ b/src/main/java/teetime/stage/string/ToLowerCase.java @@ -22,6 +22,8 @@ import teetime.framework.OutputPort; * Receives a string and passes it on to the next stage only with lower case letters. * Punctuation and similar characters will be removed. Only [a-zA-Z ] will be passed on. * + * @since 1.1 + * * @author Nelson Tavares de Sousa * */ diff --git a/src/main/java/teetime/stage/util/CountingMap.java b/src/main/java/teetime/stage/util/CountingMap.java index fcd0ed66..b12bff76 100644 --- a/src/main/java/teetime/stage/util/CountingMap.java +++ b/src/main/java/teetime/stage/util/CountingMap.java @@ -21,6 +21,8 @@ import java.util.HashMap; * An implementation of HashMap which can be used to count the occurrence of different keys. * This conaitns all methods of HashMap, but is enhanched with the {@link #add(T, Integer)} and {@link #increment(T)} methods. * + * @since 1.1 + * * @author Nelson Tavares de Sousa * * @param <T> diff --git a/src/test/java/teetime/stage/WordCountingConfiguration.java b/src/test/java/teetime/stage/WordCountingConfiguration.java index de0fbc2f..a2512c6f 100644 --- a/src/test/java/teetime/stage/WordCountingConfiguration.java +++ b/src/test/java/teetime/stage/WordCountingConfiguration.java @@ -16,31 +16,34 @@ public class WordCountingConfiguration extends AnalysisConfiguration { private final CountingMapMerger<String> result = new CountingMapMerger<String>(); - public WordCountingConfiguration(final File input/* TODO: scale to i threads */) { + public WordCountingConfiguration(final File input, final int threads) { final InitialElementProducer<File> init = new InitialElementProducer<File>(input); final File2ByteArray f2b = new File2ByteArray(); final ByteArray2String b2s = new ByteArray2String(); final Distributor<String> dist = new Distributor<String>(); - final WordCounter wc = new WordCounter(); - final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); // result - IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); - IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - interFact.create(init.getOutputPort(), f2b.getInputPort()); - interFact.create(f2b.getOutputPort(), b2s.getInputPort()); - interFact.create(b2s.getOutputPort(), dist.getInputPort()); + intraFact.create(init.getOutputPort(), f2b.getInputPort()); + intraFact.create(f2b.getOutputPort(), b2s.getInputPort()); + intraFact.create(b2s.getOutputPort(), dist.getInputPort()); // scale - intraFact.create(dist.getNewOutputPort(), wc.getInputPort()); - intraFact.create(wc.getOutputPort(), merger.getNewInputPort()); + WordCounter wc; + for (int i = 0; i < threads; i++) { + wc = new WordCounter(); + interFact.create(dist.getNewOutputPort(), wc.getInputPort()); + interFact.create(wc.getOutputPort(), merger.getNewInputPort()); + addThreadableStage(wc); + } - interFact.create(merger.getOutputPort(), result.getInputPort()); + intraFact.create(merger.getOutputPort(), result.getInputPort()); addThreadableStage(init); - addThreadableStage(wc); + addThreadableStage(merger); } diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java index 20e90ab2..c83fc26f 100644 --- a/src/test/java/teetime/stage/WordCountingTest.java +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -12,7 +12,7 @@ public class WordCountingTest { @Test public void test1() { - WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt")); + WordCountingConfiguration wcc = new WordCountingConfiguration(new File("src/test/resources/data/output.txt"), 2); Analysis analysis = new Analysis(wcc); analysis.start(); CountingMap<String> map = wcc.getResult(); -- GitLab