Skip to content
Snippets Groups Projects
Commit 24b51e62 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'benchmark' into 'master'

Benchmark

Contains:

* the benchmark, as discussed
* a test for the benchmark (without testing data loss)
* a protected static method in CompositeStage, to connect stages
* final declaration of all classes
* signal-bug fix in Merger
* new images in the download page
* EveryXthPrinter extends CompositeStage
* moved util.test

See merge request !28
parents 2d7466b4 075f6d54
Branches
Tags
No related merge requests found
Showing
with 298 additions and 50 deletions
...@@ -22,7 +22,7 @@ import teetime.framework.AbstractConsumerStage; ...@@ -22,7 +22,7 @@ import teetime.framework.AbstractConsumerStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T> { public final class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T> {
private final InputPort<Long> triggerInputPort = this.createInputPort(); private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
......
...@@ -23,7 +23,7 @@ import teetime.framework.AbstractConsumerStage; ...@@ -23,7 +23,7 @@ import teetime.framework.AbstractConsumerStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class ElementThroughputMeasuringStage<T> extends AbstractConsumerStage<T> { public final class ElementThroughputMeasuringStage<T> extends AbstractConsumerStage<T> {
private final InputPort<Long> triggerInputPort = this.createInputPort(); private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
......
...@@ -17,7 +17,7 @@ package teetime.stage; ...@@ -17,7 +17,7 @@ package teetime.stage;
import teetime.framework.AbstractProducerStage; import teetime.framework.AbstractProducerStage;
public class InitialElementProducer<T> extends AbstractProducerStage<T> { public final class InitialElementProducer<T> extends AbstractProducerStage<T> {
private final T[] elements; private final T[] elements;
......
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import teetime.stage.util.CountingMap;
/**
* This counts how many of different elements are sent to this stage. Nothing is forwarded.
* On termination a CountingMap is sent to its outputport.
*
* @since 1.1
*
* @author Nelson Tavares de Sousa
*
* @param <T>
* Type to be count
*/
public final class MappingCounter<T> extends AbstractConsumerStage<T> {
private final CountingMap<T> counter = new CountingMap<T>();
private final OutputPort<CountingMap<T>> port = createOutputPort();
public MappingCounter() {
}
@Override
protected void execute(final T element) {
counter.increment(element);
}
@Override
public void onTerminating() throws Exception {
port.send(counter);
super.onTerminating();
}
public OutputPort<CountingMap<T>> getOutputPort() {
return port;
}
}
...@@ -30,7 +30,7 @@ import teetime.framework.signal.ISignal; ...@@ -30,7 +30,7 @@ import teetime.framework.signal.ISignal;
* This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
* For its signal handling behavior see {@link #onSignal(ISignal, InputPort)} * For its signal handling behavior see {@link #onSignal(ISignal, InputPort)}
* *
* @author Christian Wulf * @author Christian Wulf, Nelson Tavares de Sousa
* *
* @since 1.0 * @since 1.0
* *
...@@ -43,7 +43,7 @@ public final class Merger<T> extends AbstractStage { ...@@ -43,7 +43,7 @@ public final class Merger<T> extends AbstractStage {
private IMergerStrategy strategy; private IMergerStrategy strategy;
private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>(); private final Map<Class<ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<ISignal>, Set<InputPort<?>>>();
public Merger() { public Merger() {
this(new RoundRobinStrategy()); this(new RoundRobinStrategy());
...@@ -74,25 +74,29 @@ public final class Merger<T> extends AbstractStage { ...@@ -74,25 +74,29 @@ public final class Merger<T> extends AbstractStage {
* @param inputPort * @param inputPort
* The port which the signal was sent to * The port which the signal was sent to
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); this.logger.warn("Got signal: " + signal + " from input port: " + inputPort);
if (signalMap.containsKey(signal.getClass())) { Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> set = signalMap.get(signal.getClass());
if (signalMap.containsKey(signalClass)) {
Set<InputPort<?>> set = signalMap.get(signalClass);
if (!set.add(inputPort)) { if (!set.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
} }
if (set.size() == this.getInputPorts().length) {
this.outputPort.sendSignal(signal);
signalMap.remove(signal.getClass());
}
} else { } else {
signal.trigger(this);
Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
tempSet.add(inputPort); tempSet.add(inputPort);
signalMap.put(signal.getClass(), tempSet); signalMap.put((Class<ISignal>) signalClass, tempSet);
}
if (signalMap.get(signalClass).size() == this.getInputPorts().length) {
signal.trigger(this);
this.outputPort.sendSignal(signal);
signalMap.remove(signalClass);
} }
} }
......
...@@ -23,7 +23,7 @@ import teetime.framework.AbstractConsumerStage; ...@@ -23,7 +23,7 @@ import teetime.framework.AbstractConsumerStage;
import com.google.common.io.Files; import com.google.common.io.Files;
public class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> { public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> {
private final File file; private final File file;
private FileOutputStream fo; private FileOutputStream fo;
......
...@@ -28,7 +28,7 @@ import teetime.framework.OutputPort; ...@@ -28,7 +28,7 @@ import teetime.framework.OutputPort;
* *
* @since 1.10 * @since 1.10
*/ */
public class Directory2FilesFilter extends AbstractConsumerStage<File> { public final class Directory2FilesFilter extends AbstractConsumerStage<File> {
private final OutputPort<File> outputPort = this.createOutputPort(); private final OutputPort<File> outputPort = this.createOutputPort();
......
...@@ -15,43 +15,38 @@ ...@@ -15,43 +15,38 @@
*/ */
package teetime.stage.io; package teetime.stage.io;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.TerminationStrategy; import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.EveryXthStage; import teetime.stage.EveryXthStage;
import teetime.stage.basic.distributor.CopyByReferenceStrategy; import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
public final class EveryXthPrinter<T> extends Stage { public final class EveryXthPrinter<T> extends CompositeStage {
private final Distributor<T> distributor; private final Distributor<T> distributor;
private final List<Stage> lastStages = new ArrayList<Stage>();
public EveryXthPrinter(final int threshold) { public EveryXthPrinter(final int threshold) {
distributor = new Distributor<T>(); distributor = new Distributor<T>();
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<Integer> printer = new Printer<Integer>(); Printer<Integer> printer = new Printer<Integer>();
IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); connectStages(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); connectStages(everyXthStage.getOutputPort(), printer.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
lastStages.add(printer);
distributor.setStrategy(new CopyByReferenceStrategy()); distributor.setStrategy(new CopyByReferenceStrategy());
} }
@Override
protected void executeWithPorts() {
distributor.executeWithPorts();
}
@Override @Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
distributor.validateOutputPorts(invalidPortConnections); distributor.validateOutputPorts(invalidPortConnections);
...@@ -95,4 +90,14 @@ public final class EveryXthPrinter<T> extends Stage { ...@@ -95,4 +90,14 @@ public final class EveryXthPrinter<T> extends Stage {
return distributor.isStarted(); return distributor.isStarted();
} }
@Override
protected Stage getFirstStage() {
return distributor;
}
@Override
protected Collection<? extends Stage> getLastStages() {
return lastStages;
}
} }
...@@ -23,7 +23,7 @@ import teetime.framework.OutputPort; ...@@ -23,7 +23,7 @@ import teetime.framework.OutputPort;
import com.google.common.io.Files; import com.google.common.io.Files;
public class File2ByteArray extends AbstractConsumerStage<File> { public final class File2ByteArray extends AbstractConsumerStage<File> {
private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final OutputPort<byte[]> outputPort = this.createOutputPort();
...@@ -41,13 +41,4 @@ public class File2ByteArray extends AbstractConsumerStage<File> { ...@@ -41,13 +41,4 @@ public class File2ByteArray extends AbstractConsumerStage<File> {
return this.outputPort; return this.outputPort;
} }
@Override
public boolean shouldBeTerminated() {
return false;
}
@Override
public void terminate() {
}
} }
...@@ -30,7 +30,7 @@ import teetime.stage.util.TextLine; ...@@ -30,7 +30,7 @@ import teetime.stage.util.TextLine;
* @author Christian Wulf * @author Christian Wulf
* *
*/ */
public class File2TextLinesFilter extends AbstractConsumerStage<File> { public final class File2TextLinesFilter extends AbstractConsumerStage<File> {
private final OutputPort<TextLine> outputPort = this.createOutputPort(); private final OutputPort<TextLine> outputPort = this.createOutputPort();
......
...@@ -29,7 +29,7 @@ import teetime.framework.AbstractConsumerStage; ...@@ -29,7 +29,7 @@ import teetime.framework.AbstractConsumerStage;
* *
* @since 1.10 * @since 1.10
*/ */
public class Printer<T> extends AbstractConsumerStage<T> { public final class Printer<T> extends AbstractConsumerStage<T> {
public static final String STREAM_STDOUT = "STDOUT"; public static final String STREAM_STDOUT = "STDOUT";
public static final String STREAM_STDERR = "STDERR"; public static final String STREAM_STDERR = "STDERR";
......
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
/**
* Receives a string and passes it on to the next stage only with lower case letters.
* Punctuation and similar characters will be removed. Only [a-zA-Z ] will be passed on.
*
* @since 1.1
*
* @author Nelson Tavares de Sousa
*
*/
public final class ToLowerCase extends AbstractConsumerStage<String> {
private final OutputPort<String> outputPort = this.createOutputPort();
@Override
protected void execute(final String element) {
outputPort.send(element.replaceAll("[^a-zA-Z ]", "").toLowerCase());
}
public OutputPort<String> getOutputPort() {
return outputPort;
}
}
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string;
import java.util.ArrayList;
import java.util.Collection;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.stage.MappingCounter;
import teetime.stage.util.CountingMap;
/**
* Intermediate stage, which receives texts and counts the occurring words.
* The result (a {@link CountingMap}) is passed on upon termination.
*
* @since 1.1
*
* @author Nelson Tavares de Sousa
*
*/
public final class WordCounter extends CompositeStage {
// This fields are needed for the methods to work.
private final Tokenizer tokenizer = new Tokenizer(" ");
private final MappingCounter<String> mapCounter = new MappingCounter<String>();
private final ArrayList<Stage> lastStages = new ArrayList<Stage>();
// The connection of the different stages is realized within the construction of a instance of this class.
public WordCounter() {
lastStages.add(mapCounter);
ToLowerCase toLowerCase = new ToLowerCase();
connectStages(tokenizer.getOutputPort(), toLowerCase.getInputPort());
connectStages(toLowerCase.getOutputPort(), mapCounter.getInputPort());
}
@Override
protected Stage getFirstStage() {
return tokenizer;
}
@Override
protected Collection<? extends Stage> getLastStages() {
return lastStages;
}
public InputPort<String> getInputPort() {
return tokenizer.getInputPort();
}
public OutputPort<CountingMap<String>> getOutputPort() {
return mapCounter.getOutputPort();
}
}
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.util;
import java.util.HashMap;
/**
* An implementation of HashMap which can be used to count the occurrence of different keys.
* This conaitns all methods of HashMap, but is enhanched with the {@link #add(T, Integer)} and {@link #increment(T)} methods.
*
* @since 1.1
*
* @author Nelson Tavares de Sousa
*
* @param <T>
* Key type to be count
*/
public final class CountingMap<T> extends HashMap<T, Integer> {
/**
* Generated serialVersionUID
*/
private static final long serialVersionUID = -8036971796701648200L;
/**
* Increments the value of key by one.
*
* @param key
* The key which sould be incremented
*/
public void increment(final T key) {
if (super.containsKey(key)) {
Integer i = super.get(key);
i++;
super.put(key, i);
} else {
super.put(key, 1);
}
}
/**
* Adds i to the value of key.
*
* @param key
* Key which is used to add i.
* @param i
* Integer value to be added.
*/
public void add(final T key, final Integer i) {
if (super.containsKey(key)) {
Integer j = super.get(key);
j += i;
super.put(key, j);
} else {
super.put(key, i);
}
}
}
...@@ -20,7 +20,7 @@ package teetime.stage.util; ...@@ -20,7 +20,7 @@ package teetime.stage.util;
* *
* @since 1.10 * @since 1.10
*/ */
public class MappingException extends Exception { public final class MappingException extends Exception {
private static final long serialVersionUID = 7300752837946139350L; private static final long serialVersionUID = 7300752837946139350L;
......
...@@ -22,7 +22,7 @@ import java.io.File; ...@@ -22,7 +22,7 @@ import java.io.File;
* *
* @since 1.10 * @since 1.10
*/ */
public class TextLine { public final class TextLine {
private final File textFile; private final File textFile;
private final String textLine; private final String textLine;
......
...@@ -25,7 +25,7 @@ import java.util.List; ...@@ -25,7 +25,7 @@ import java.util.List;
* *
* @param <T> * @param <T>
*/ */
public class CyclicListIterator<T> implements Iterator<T> { public final class CyclicListIterator<T> implements Iterator<T> {
private final List<T> list; private final List<T> list;
// private Iterator<T> iterator; // private Iterator<T> iterator;
...@@ -37,10 +37,12 @@ public class CyclicListIterator<T> implements Iterator<T> { ...@@ -37,10 +37,12 @@ public class CyclicListIterator<T> implements Iterator<T> {
// this.iterator = this.list.iterator(); // this.iterator = this.list.iterator();
} }
@Override
public boolean hasNext() { public boolean hasNext() {
return true; return true;
} }
@Override
public T next() { public T next() {
// if (!this.iterator.hasNext()) { // if (!this.iterator.hasNext()) {
// this.iterator = this.list.iterator(); // this.iterator = this.list.iterator();
...@@ -56,6 +58,7 @@ public class CyclicListIterator<T> implements Iterator<T> { ...@@ -56,6 +58,7 @@ public class CyclicListIterator<T> implements Iterator<T> {
return element; return element;
} }
@Override
public void remove() { public void remove() {
// this.iterator.remove(); // this.iterator.remove();
this.currentIndex = this.getCurrentIndex(); this.currentIndex = this.getCurrentIndex();
......
...@@ -24,7 +24,7 @@ import teetime.util.concurrent.hashmap.ValueFactory; ...@@ -24,7 +24,7 @@ import teetime.util.concurrent.hashmap.ValueFactory;
* *
* @since 1.10 * @since 1.10
*/ */
public class HashMapWithDefault<K, V> extends HashMap<K, V> { public final class HashMapWithDefault<K, V> extends HashMap<K, V> {
private static final long serialVersionUID = -7958038532219740472L; private static final long serialVersionUID = -7958038532219740472L;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
*/ */
package teetime.util; package teetime.util;
public class Pair<F, S> { public final class Pair<F, S> {
private final F first; private final F first;
private final S second; private final S second;
......
...@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentMap; ...@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentMap;
* @author Christian Wulf * @author Christian Wulf
* @since 1.11 * @since 1.11
*/ */
public class CachedClassForNameResolver<T> { public final class CachedClassForNameResolver<T> {
private final ConcurrentMap<String, Class<? extends T>> cachedClasses = new ConcurrentHashMap<String, Class<? extends T>>(); // NOCS private final ConcurrentMap<String, Class<? extends T>> cachedClasses = new ConcurrentHashMap<String, Class<? extends T>>(); // NOCS
private final ClassForNameResolver<T> classForNameResolver; private final ClassForNameResolver<T> classForNameResolver;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment