Skip to content
Snippets Groups Projects
Commit 300e0255 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

added connectStages to simplify the usage of CompositeStages;

enhanced the docs for the benchmark so it can be used as example or for
demonstration
parent 8e36ac2c
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,10 @@ package teetime.framework; ...@@ -18,7 +18,10 @@ package teetime.framework;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
...@@ -32,7 +35,8 @@ import teetime.framework.validation.InvalidPortConnection; ...@@ -32,7 +35,8 @@ import teetime.framework.validation.InvalidPortConnection;
@SuppressWarnings("PMD.AbstractNaming") @SuppressWarnings("PMD.AbstractNaming")
public abstract class CompositeStage extends Stage { public abstract class CompositeStage extends Stage {
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; protected static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE
.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
protected abstract Stage getFirstStage(); protected abstract Stage getFirstStage();
...@@ -90,4 +94,8 @@ public abstract class CompositeStage extends Stage { ...@@ -90,4 +94,8 @@ public abstract class CompositeStage extends Stage {
super.setOwningThread(owningThread); super.setOwningThread(owningThread);
} }
protected static <T> void connectStages(final OutputPort<? extends T> out, final InputPort<T> in) {
INTRA_PIPE_FACTORY.create(out, in);
}
} }
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string; package teetime.stage.string;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -7,16 +22,12 @@ import teetime.framework.CompositeStage; ...@@ -7,16 +22,12 @@ import teetime.framework.CompositeStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.MappingCounter; import teetime.stage.MappingCounter;
import teetime.stage.util.CountingMap; import teetime.stage.util.CountingMap;
/** /**
* Intermediate stage, which receives texts and counts the occurring words. * Intermediate stage, which receives texts and counts the occurring words.
* The result is passed on upon termination. * The result (a {@link CountingMap}) is passed on upon termination.
* *
* @since 1.1 * @since 1.1
* *
...@@ -25,17 +36,18 @@ import teetime.stage.util.CountingMap; ...@@ -25,17 +36,18 @@ import teetime.stage.util.CountingMap;
*/ */
public class WordCounter extends CompositeStage { public class WordCounter extends CompositeStage {
// This fields are needed for the methods to work.
private final Tokenizer tokenizer = new Tokenizer(" "); private final Tokenizer tokenizer = new Tokenizer(" ");
private final MappingCounter<String> mapCounter = new MappingCounter<String>(); private final MappingCounter<String> mapCounter = new MappingCounter<String>();
private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
// The connection of the different stages is realized within the construction of a instance of this class.
public WordCounter() { public WordCounter() {
lastStages.add(mapCounter); lastStages.add(mapCounter);
IPipeFactory pipeFact = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
ToLowerCase toLowerCase = new ToLowerCase(); ToLowerCase toLowerCase = new ToLowerCase();
pipeFact.create(tokenizer.getOutputPort(), toLowerCase.getInputPort()); connectStages(tokenizer.getOutputPort(), toLowerCase.getInputPort());
pipeFact.create(toLowerCase.getOutputPort(), mapCounter.getInputPort()); connectStages(toLowerCase.getOutputPort(), mapCounter.getInputPort());
} }
@Override @Override
......
...@@ -39,6 +39,7 @@ public class CountingMap<T> extends HashMap<T, Integer> { ...@@ -39,6 +39,7 @@ public class CountingMap<T> extends HashMap<T, Integer> {
* Increments the value of key by one. * Increments the value of key by one.
* *
* @param key * @param key
* The key which sould be incremented
*/ */
public void increment(final T key) { public void increment(final T key) {
if (super.containsKey(key)) { if (super.containsKey(key)) {
......
wiki @ 63ccbbc8
Subproject commit a93581905ef7b0584d52eae1898148ffa6201a31 Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage; package teetime.stage;
import java.io.File; import java.io.File;
...@@ -12,40 +27,63 @@ import teetime.stage.io.File2ByteArray; ...@@ -12,40 +27,63 @@ import teetime.stage.io.File2ByteArray;
import teetime.stage.string.WordCounter; import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap; import teetime.stage.util.CountingMap;
/**
* A simple configuration, which counts the words of a set of files.
* The execution of this configuration is demonstrated in {@link WordCountingTest}.
*
* This configuration is divided into three parts. The first part reads files and distributes them to different {@link WordCounter} instances.
* The second part are a certain number of WordCounter instances. On construction of this class the number of concurrent WordCounter instances is specified with the
* threads parameter.
* The third and last part collects the results from all WordCounter instances and merges them. This final result can be read afterwards.
*
*
* @author Nelson Tavares de Sousa
*
*/
public class WordCountingConfiguration extends AnalysisConfiguration { public class WordCountingConfiguration extends AnalysisConfiguration {
// Last stage is saved as field, to retrieve the result after execution.
private final CountingMapMerger<String> result = new CountingMapMerger<String>(); private final CountingMapMerger<String> result = new CountingMapMerger<String>();
public WordCountingConfiguration(final int threads, final File... input) { public WordCountingConfiguration(final int threads, final File... input) {
// First part of the config
final InitialElementProducer<File> init = new InitialElementProducer<File>(input); final InitialElementProducer<File> init = new InitialElementProducer<File>(input);
final File2ByteArray f2b = new File2ByteArray(); final File2ByteArray f2b = new File2ByteArray();
final ByteArray2String b2s = new ByteArray2String(); final ByteArray2String b2s = new ByteArray2String();
final Distributor<String> dist = new Distributor<String>(); final Distributor<String> dist = new Distributor<String>();
// last part
final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
// result // CountingMapMerger (already as field)
// PipeFactory instaces for intra- and inter-thread communication
IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); IPipeFactory interFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); IPipeFactory intraFact = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
// Connecting the stages of the first part of the config
intraFact.create(init.getOutputPort(), f2b.getInputPort()); intraFact.create(init.getOutputPort(), f2b.getInputPort());
intraFact.create(f2b.getOutputPort(), b2s.getInputPort()); intraFact.create(f2b.getOutputPort(), b2s.getInputPort());
intraFact.create(b2s.getOutputPort(), dist.getInputPort()); intraFact.create(b2s.getOutputPort(), dist.getInputPort());
// scale // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
WordCounter wc; WordCounter wc;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
wc = new WordCounter(); wc = new WordCounter();
interFact.create(dist.getNewOutputPort(), wc.getInputPort()); interFact.create(dist.getNewOutputPort(), wc.getInputPort());
interFact.create(wc.getOutputPort(), merger.getNewInputPort()); interFact.create(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
addThreadableStage(wc); addThreadableStage(wc);
} }
// Connect the stages of the last part
intraFact.create(merger.getOutputPort(), result.getInputPort()); intraFact.create(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
addThreadableStage(init); addThreadableStage(init);
addThreadableStage(merger); addThreadableStage(merger);
} }
// Further methods are allowed. For e.g. it is possible to read data from certain stages.
public CountingMap<String> getResult() { public CountingMap<String> getResult() {
return result.getResult(); return result.getResult();
} }
......
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage; package teetime.stage;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
...@@ -11,9 +26,9 @@ import teetime.stage.util.CountingMap; ...@@ -11,9 +26,9 @@ import teetime.stage.util.CountingMap;
public class WordCountingTest { public class WordCountingTest {
@Test(timeout = 3000) @Test
public void test1() { public void test1() {
int threads = 2; int threads = 6;
WordCountingConfiguration wcc = new WordCountingConfiguration(threads, new File("src/test/resources/data/output.txt"), new File( WordCountingConfiguration wcc = new WordCountingConfiguration(threads, new File("src/test/resources/data/output.txt"), new File(
"src/test/resources/data/output.txt")); "src/test/resources/data/output.txt"));
Analysis analysis = new Analysis(wcc); Analysis analysis = new Analysis(wcc);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment