diff --git a/.gitignore b/.gitignore index 952e77c1385599de1eb326d2438715f38357e347..f070d111b06416efc88d69f2196138665ef7147f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /target .DS_Store teetime.log +/src/main/resources/hugetext.txt diff --git a/src/main/java/teetime/framework/MonitoringThread.java b/src/main/java/teetime/framework/MonitoringThread.java index 31905af97fc79d99e87d853566319679520b5442..94c3b489475131bfeda13cf3e57fa335975dfbd0 100644 --- a/src/main/java/teetime/framework/MonitoringThread.java +++ b/src/main/java/teetime/framework/MonitoringThread.java @@ -21,21 +21,23 @@ import java.util.List; import org.slf4j.LoggerFactory; import teetime.framework.pipe.IMonitorablePipe; -import teetime.framework.pipe.IPipe; public class MonitoringThread extends Thread { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MonitoringThread.class); - private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>(); + // private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>(); private volatile boolean terminated; + private final List<AbstractPort<?>> monitoredPorts = new ArrayList<AbstractPort<?>>(); + @Override public void run() { while (!terminated) { - for (final IMonitorablePipe pipe : monitoredPipes) { + for (final AbstractPort<?> port : monitoredPorts) { + IMonitorablePipe pipe = (IMonitorablePipe) port.getPipe(); final long pushThroughput = pipe.getPushThroughput(); final long pullThroughput = pipe.getPullThroughput(); final double ratio = (double) pushThroughput / pullThroughput; @@ -54,12 +56,12 @@ public class MonitoringThread extends Thread { } } - public void addPipe(final IPipe pipe) { - if (!(pipe instanceof IMonitorablePipe)) { - throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe"); - } - monitoredPipes.add((IMonitorablePipe) pipe); - } + // public void addPipe(final IPipe pipe) { + // if (!(pipe instanceof IMonitorablePipe)) { + // throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe"); + // } + // monitoredPipes.add((IMonitorablePipe) pipe); + // } /** * Sets the <code>terminated</code> flag and interrupts this thread. @@ -69,4 +71,8 @@ public class MonitoringThread extends Thread { interrupt(); } + public void addPort(final InputPort<String> inputPort) { + monitoredPorts.add(inputPort); + } + } diff --git a/src/main/java/teetime/framework/exceptionHandling/TerminateException.java b/src/main/java/teetime/framework/exceptionHandling/TerminateException.java index a05e2674f95cfd97a046702bae6110e837f620fe..47581a810a16c8872f955b9a3a9248808edf7cc7 100644 --- a/src/main/java/teetime/framework/exceptionHandling/TerminateException.java +++ b/src/main/java/teetime/framework/exceptionHandling/TerminateException.java @@ -19,10 +19,10 @@ import teetime.util.StacklessException; /** * Represents an exception that is used to terminate the running thread. - * + * * @since 1.1 */ -public class TerminateException extends StacklessException { +public final class TerminateException extends StacklessException { public static final TerminateException INSTANCE = new TerminateException("Framework Exception"); diff --git a/src/main/java/teetime/stage/MappingCounter.java b/src/main/java/teetime/stage/MappingCounter.java index d36785669297296a58b61c5e7f6179f4bfd40207..587ed7d1c00a10032a02c46a6c471f3e9430afdc 100644 --- a/src/main/java/teetime/stage/MappingCounter.java +++ b/src/main/java/teetime/stage/MappingCounter.java @@ -35,14 +35,9 @@ public final class MappingCounter<T> extends AbstractConsumerStage<T> { private final CountingMap<T> counter = new CountingMap<T>(); private final OutputPort<CountingMap<T>> port = createOutputPort(); - public MappingCounter() { - - } - @Override protected void execute(final T element) { counter.increment(element); - } @Override diff --git a/src/main/java/teetime/stage/string/ToLowerCase.java b/src/main/java/teetime/stage/string/ToLowerCase.java index 9a6efdd0a28d14855964da96f88e300639f70116..f15c17513ea4a7f0ff6a386a89d4d6b3c131f621 100644 --- a/src/main/java/teetime/stage/string/ToLowerCase.java +++ b/src/main/java/teetime/stage/string/ToLowerCase.java @@ -15,6 +15,8 @@ */ package teetime.stage.string; +import java.util.Locale; + import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; @@ -31,8 +33,7 @@ public final class ToLowerCase extends AbstractConsumerStage<String> { @Override protected void execute(final String element) { - this.outputPort.send(element.toLowerCase()); - + this.outputPort.send(element.toLowerCase(Locale.ENGLISH)); } public OutputPort<String> getOutputPort() { diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index f7d5298216b93ded867e36b87d7890fd8216ed03..cef131b0b4f91e135030e5b758be336c1108cc55 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -36,7 +36,6 @@ public final class WordCounter extends AbstractCompositeStage { private final MappingCounter<String> mapCounter; public WordCounter() { - this.tokenizer = new Tokenizer(" "); final ToLowerCase toLowerCase = new ToLowerCase(); this.mapCounter = new MappingCounter<String>(); diff --git a/src/main/resources/hugetext.zip b/src/main/resources/hugetext.zip new file mode 100644 index 0000000000000000000000000000000000000000..1373ce2e9f8356f7308e61a6efb8687ae5021a68 Binary files /dev/null and b/src/main/resources/hugetext.zip differ diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..034a2c7d5a54ea657687ed2e1f98d8270f8bfb52 --- /dev/null +++ b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.examples.wordcounter; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import teetime.framework.AbstractPort; +import teetime.framework.Configuration; +import teetime.framework.MonitoringThread; +import teetime.stage.CountingMapMerger; +import teetime.stage.InitialElementProducer; +import teetime.stage.basic.distributor.Distributor; +import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; +import teetime.stage.basic.merger.Merger; +import teetime.stage.io.File2SeqOfWords; +import teetime.stage.string.WordCounter; +import teetime.stage.util.CountingMap; + +/** + * A simple configuration, which counts the words of a set of files. + * The execution of this configuration is demonstrated in {@link WordCountingTest}. + * + * This configuration is divided into three parts. The first part reads files and distributes them to different {@link WordCounter} instances. + * The second part are a certain number of WordCounter instances. On construction of this class the number of concurrent WordCounter instances is specified with the + * threads parameter. + * The third and last part collects the results from all WordCounter instances and merges them. This final result can be read afterwards. + * + * + * @author Nelson Tavares de Sousa + * + */ +public class WordCounterConfiguration extends Configuration { + + // Last stage is saved as field, to retrieve the result after execution. + private final CountingMapMerger<String> result = new CountingMapMerger<String>(); + + private final List<AbstractPort<?>> distributorPorts = new ArrayList<AbstractPort<?>>(); + private final List<AbstractPort<?>> mergerPorts = new ArrayList<AbstractPort<?>>(); + + private final MonitoringThread monitoringThread; + + private final Distributor<String> distributor; + + public WordCounterConfiguration(final int threads, final File... input) { + // First part of the config + final InitialElementProducer<File> init = new InitialElementProducer<File>(input); + // final File2Lines f2b = new File2Lines(); + final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512); + distributor = new Distributor<String>(new RoundRobinStrategy2()); + + // last part + final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>(); + // CountingMapMerger (already as field) + + // Connecting the stages of the first part of the config + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), distributor.getInputPort()); + + monitoringThread = new MonitoringThread(); + + // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages + for (int i = 0; i < threads; i++) { + // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); + final WordCounter wc = new WordCounter(); + // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); + final WordCounter threadableStage = wc; + + connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000); + connectPorts(wc.getOutputPort(), merger.getNewInputPort()); + // Add WordCounter as a threadable stage, so it runs in its own thread + addThreadableStage(threadableStage.getInputPort().getOwningStage()); + + distributorPorts.add(threadableStage.getInputPort()); + mergerPorts.add(wc.getOutputPort()); + + monitoringThread.addPort(threadableStage.getInputPort()); + } + + // Connect the stages of the last part + connectPorts(merger.getOutputPort(), result.getInputPort()); + + // Add the first and last part to the threadable stages + addThreadableStage(init); + addThreadableStage(merger); + } + + public MonitoringThread getMonitoringThread() { + return monitoringThread; + } + + // Further methods are allowed. For e.g. it is possible to read data from certain stages. + public CountingMap<String> getResult() { + return result.getResult(); + } + + public List<AbstractPort<?>> getDistributorPorts() { + return distributorPorts; + } + + public List<AbstractPort<?>> getMergerPorts() { + return mergerPorts; + } + + public Distributor<String> getDistributor() { + return distributor; + } + +} diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterTest.java b/src/test/java/teetime/examples/wordcounter/WordCounterTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ae65d4fdd8fc061719eb93c7207cc5a7c56b8d56 --- /dev/null +++ b/src/test/java/teetime/examples/wordcounter/WordCounterTest.java @@ -0,0 +1,111 @@ +package teetime.examples.wordcounter; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.AbstractPort; +import teetime.framework.Execution; +import teetime.framework.pipe.IMonitorablePipe; +import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; +import teetime.stage.util.CountingMap; +import teetime.util.StopWatch; + +import com.google.common.base.Joiner; +import com.google.common.primitives.Longs; + +public class WordCounterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(WordCounterTest.class); + + public static void writeTimingsToFile(final File outputFile, final long[] timings) throws UnsupportedEncodingException, FileNotFoundException { + final PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8"); + try { + final Joiner joiner = com.google.common.base.Joiner.on(' '); + final String timingsString = joiner.join(Longs.asList(timings)); + ps.println(timingsString); + } finally { + ps.close(); + } + } + + public static void main(final String[] args) throws UnsupportedEncodingException, FileNotFoundException { + int numWorkerThreads; + try { + numWorkerThreads = Integer.valueOf(args[0]); + } catch (final NumberFormatException e) { + numWorkerThreads = 3; + } + LOGGER.info("# worker threads: " + numWorkerThreads); + + int numWarmUps; + try { + numWarmUps = Integer.valueOf(args[1]); + } catch (final NumberFormatException e) { + numWarmUps = 1; + } + LOGGER.info("# warm ups: " + numWarmUps); + + final long[] timings = new long[1]; + + final String fileName = args[2]; + final File testFile = new File(fileName); + + final StopWatch stopWatch = new StopWatch(); + + for (int i = 0; i < numWarmUps; i++) { + LOGGER.info("Warm up #" + i); + final WordCounterConfiguration wcc = new WordCounterConfiguration(numWorkerThreads, testFile); + final Execution<?> analysis = new Execution<WordCounterConfiguration>(wcc); + + stopWatch.start(); + analysis.executeBlocking(); + stopWatch.end(); + + LOGGER.info("duration: " + TimeUnit.NANOSECONDS.toSeconds(stopWatch.getDurationInNs()) + " secs"); + } + + LOGGER.info("Starting analysis..."); + final WordCounterConfiguration wcc = new WordCounterConfiguration(numWorkerThreads, testFile); + final Execution<?> analysis = new Execution<WordCounterConfiguration>(wcc); + + wcc.getMonitoringThread().start(); + stopWatch.start(); + analysis.executeBlocking(); + stopWatch.end(); + wcc.getMonitoringThread().terminate(); + + LOGGER.info("duration: " + TimeUnit.NANOSECONDS.toSeconds(stopWatch.getDurationInNs()) + " secs"); + timings[0] = stopWatch.getDurationInNs(); + + // System.out.println("exceptions: " + exceptions); + + final CountingMap<String> map = wcc.getResult(); + System.out.println("vero: " + (map.get("vero") == 3813850) + "->" + map.get("vero") + " should be " + 3813850); + System.out.println("sit: " + (map.get("sit") == 7627700) + "->" + map.get("sit") + " should be " + 7627700); + + final File outputFile = new File("timings.txt"); + writeTimingsToFile(outputFile, timings); + + System.out.println("distributor pipes:"); + for (final AbstractPort<?> port : wcc.getDistributorPorts()) { + final IMonitorablePipe spscPipe = (IMonitorablePipe) port.getPipe(); + System.out.println("numWaits: " + spscPipe.getNumWaits()); + } + + System.out.println("merger pipes:"); + for (final AbstractPort<?> port : wcc.getMergerPorts()) { + final IMonitorablePipe spscPipe = (IMonitorablePipe) port.getPipe(); + System.out.println("numWaits: " + spscPipe.getNumWaits()); + } + + System.out.println("distributor waits: " + ((RoundRobinStrategy2) wcc.getDistributor().getStrategy()).getNumWaits()); + } +}