From 300e02554297c38cdebd9b4facd01602d451397a Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de> Date: Wed, 18 Feb 2015 13:42:36 +0100 Subject: [PATCH] added connectStages to simplify the usage of CompositeStages; enhanced the docs for the benchmark so it can be used as example or for demonstration --- .../teetime/framework/CompositeStage.java | 10 ++++- .../teetime/stage/string/WordCounter.java | 28 +++++++++---- .../java/teetime/stage/util/CountingMap.java | 1 + src/site/markdown/wiki | 2 +- .../stage/WordCountingConfiguration.java | 42 ++++++++++++++++++- .../java/teetime/stage/WordCountingTest.java | 19 ++++++++- 6 files changed, 88 insertions(+), 14 deletions(-) diff --git a/src/main/java/teetime/framework/CompositeStage.java b/src/main/java/teetime/framework/CompositeStage.java index 7beee244..d5c4986c 100644 --- a/src/main/java/teetime/framework/CompositeStage.java +++ b/src/main/java/teetime/framework/CompositeStage.java @@ -18,7 +18,10 @@ package teetime.framework; import java.util.Collection; import java.util.List; +import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -32,7 +35,8 @@ import teetime.framework.validation.InvalidPortConnection; @SuppressWarnings("PMD.AbstractNaming") public abstract class CompositeStage extends Stage { - protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; + protected static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE + .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); protected abstract Stage getFirstStage(); @@ -90,4 +94,8 @@ public abstract class CompositeStage extends Stage { super.setOwningThread(owningThread); } + protected static <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) { + INTRA_PIPE_FACTORY.create(out, in); + } + } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index e453a463..0266f044 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package teetime.stage.string; import java.util.ArrayList; @@ -7,16 +22,12 @@ import teetime.framework.CompositeStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.MappingCounter; import teetime.stage.util.CountingMap; /** * Intermediate stage, which receives texts and counts the occurring words. - * The result is passed on upon termination. + * The result (a {@link CountingMap}) is passed on upon termination. * * @since 1.1 * @@ -25,17 +36,18 @@ import teetime.stage.util.CountingMap; */ public class WordCounter extends CompositeStage { + // This fields are needed for the methods to work. private final Tokenizer tokenizer = new Tokenizer(" "); private final MappingCounter<String> mapCounter = new MappingCounter<String>(); private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); + // The connection of the different stages is realized within the construction of a instance of this class. public WordCounter() { lastStages.add(mapCounter); - IPipeFactory pipeFact = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); ToLowerCase toLowerCase = new ToLowerCase(); - pipeFact.create(tokenizer.getOutputPort(), toLowerCase.getInputPort()); - pipeFact.create(toLowerCase.getOutputPort(), mapCounter.getInputPort()); + connectStages(tokenizer.getOutputPort(), toLowerCase.getInputPort()); + connectStages(toLowerCase.getOutputPort(), mapCounter.getInputPort()); } @Override diff --git a/src/main/java/teetime/stage/util/CountingMap.java b/src/main/java/teetime/stage/util/CountingMap.java index b12bff76..1c995c28 100644 --- a/src/main/java/teetime/stage/util/CountingMap.java +++ b/src/main/java/teetime/stage/util/CountingMap.java @@ -39,6 +39,7 @@ public class CountingMap<T> extends HashMap<T, Integer> { * Increments the value of key by one. * * @param key + * The key which sould be incremented */ public void increment(final T key) { if (super.containsKey(key)) { diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki index a9358190..63ccbbc8 160000 --- a/src/site/markdown/wiki +++ b/src/site/markdown/wiki @@ -1 +1 @@ -Subproject commit a93581905ef7b0584d52eae1898148ffa6201a31 +Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de diff --git a/src/test/java/teetime/stage/WordCountingConfiguration.java b/src/test/java/teetime/stage/WordCountingConfiguration.java index 10297520..31537566 100644 --- a/src/test/java/teetime/stage/WordCountingConfiguration.java +++ b/src/test/java/teetime/stage/WordCountingConfiguration.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package teetime.stage; import java.io.File; @@ -12,40 +27,63 @@ import teetime.stage.io.File2ByteArray; import teetime.stage.string.WordCounter; import teetime.stage.util.CountingMap; +/** + * A simple configuration, which counts the words of a set of files. + * The execution of this configuration is demonstrated in {@link WordCountingTest}. + * + * This configuration is divided into three parts. The first part reads files and distributes them to different {@link WordCounter} instances. + * The second part are a certain number of WordCounter instances. On construction of this class the number of concurrent WordCounter instances is specified with the + * threads parameter. + * The third and last part collects the results from all WordCounter instances and merges them. This final result can be read afterwards. + * + * + * @author Nelson Tavares de Sousa + * + */ public class WordCountingConfiguration extends AnalysisConfiguration { + // Last stage is saved as field, to retrieve the result after execution. private final CountingMapMerger<String> result = new CountingMapMerger<String>(); public WordCountingConfiguration(final int threads, final File... input) { + // First part of the config final InitialElementProducer<File> init = new InitialElementProducer<File>(input); final File2ByteArray f2b = new File2ByteArray(); final ByteArray2String b2s = new ByteArray2String(); final Distributor<String> dist = new Distributor<String>(); + // last part final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); - // result + // CountingMapMerger (already as field) + + // PipeFactory instaces for intra- and inter-thread communication IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + // Connecting the stages of the first part of the config intraFact.create(init.getOutputPort(), f2b.getInputPort()); intraFact.create(f2b.getOutputPort(), b2s.getInputPort()); intraFact.create(b2s.getOutputPort(), dist.getInputPort()); - // scale + // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages WordCounter wc; for (int i = 0; i < threads; i++) { wc = new WordCounter(); interFact.create(dist.getNewOutputPort(), wc.getInputPort()); interFact.create(wc.getOutputPort(), merger.getNewInputPort()); + // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); } + // Connect the stages of the last part intraFact.create(merger.getOutputPort(), result.getInputPort()); + // Add the first and last part to the threadable stages addThreadableStage(init); addThreadableStage(merger); } + // Further methods are allowed. For e.g. it is possible to read data from certain stages. public CountingMap<String> getResult() { return result.getResult(); } diff --git a/src/test/java/teetime/stage/WordCountingTest.java b/src/test/java/teetime/stage/WordCountingTest.java index c1df1ba9..5c033b7a 100644 --- a/src/test/java/teetime/stage/WordCountingTest.java +++ b/src/test/java/teetime/stage/WordCountingTest.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package teetime.stage; import static org.junit.Assert.assertEquals; @@ -11,9 +26,9 @@ import teetime.stage.util.CountingMap; public class WordCountingTest { - @Test(timeout = 3000) + @Test public void test1() { - int threads = 2; + int threads = 6; WordCountingConfiguration wcc = new WordCountingConfiguration(threads, new File("src/test/resources/data/output.txt"), new File( "src/test/resources/data/output.txt")); Analysis analysis = new Analysis(wcc); -- GitLab