diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 6428bfcdf4bd941fab0b1cff6b8148f19fc75747..272829c98a168fe30809408fb54fba974a022f37 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Tue Nov 18 10:57:28 CET 2014 +#Tue Nov 25 12:10:00 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/pipe/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java similarity index 80% rename from src/main/java/teetime/framework/pipe/AbstractInterThreadPipe.java rename to src/main/java/teetime/framework/AbstractInterThreadPipe.java index e1504baad4e789fee4d8cee541153fa64167e4ea..872f965ea66e3c371b4fbde9da8571bc083400a4 100644 --- a/src/main/java/teetime/framework/pipe/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -1,4 +1,4 @@ -package teetime.framework.pipe; +package teetime.framework; import java.util.Queue; @@ -7,15 +7,14 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; -import teetime.framework.InputPort; -import teetime.framework.OutputPort; +import teetime.framework.pipe.AbstractPipe; import teetime.framework.signal.ISignal; public abstract class AbstractInterThreadPipe extends AbstractPipe { private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); - <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort); } diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..9d0223009c18ca560f29d1589160d90a81e1012f --- /dev/null +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -0,0 +1,24 @@ +package teetime.framework; + +import teetime.framework.pipe.AbstractPipe; +import teetime.framework.signal.ISignal; + +public abstract class AbstractIntraThreadPipe extends AbstractPipe { + + protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + super(sourcePort, targetPort); + } + + @Override + public final void sendSignal(final ISignal signal) { + // if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts + this.cachedTargetStage.onSignal(signal, this.getTargetPort()); + // } + } + + @Override + public final void reportNewElement() { + this.cachedTargetStage.executeWithPorts(); + } + +} diff --git a/src/main/java/teetime/framework/AbstractProducerStage.java b/src/main/java/teetime/framework/AbstractProducerStage.java index 7918c922da305a933b8d6cfbee3a2a56c98eaee3..e34f6705c6fb7728a6e68f4a16405d477a2aa15e 100644 --- a/src/main/java/teetime/framework/AbstractProducerStage.java +++ b/src/main/java/teetime/framework/AbstractProducerStage.java @@ -9,7 +9,7 @@ package teetime.framework; * the type of the default output port * */ -public abstract class AbstractProducerStage<O> extends AbstractStage implements IStage { +public abstract class AbstractProducerStage<O> extends AbstractStage { protected final OutputPort<O> outputPort = this.createOutputPort(); diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 941261cd9371725ed5be8113d9c002dee101bdd1..5bb58812432667c97c13f7e1e8f5df8eb0411533 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -1,28 +1,16 @@ package teetime.framework; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; -public abstract class AbstractStage implements IStage { - - private final String id; - /** - * A unique logger instance per stage instance - */ - protected final Logger logger; // NOPMD - - private IStage parentStage; +public abstract class AbstractStage extends Stage { private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); @@ -32,31 +20,9 @@ public abstract class AbstractStage implements IStage { /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; - private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>(); + private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); private boolean shouldTerminate; - public AbstractStage() { - this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name - this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); - } - - /** - * Sends the given <code>element</code> using the default output port - * - * @param element - * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) - */ - protected final <O> boolean send(final OutputPort<O> outputPort, final O element) { - if (!outputPort.send(element)) { - return false; - } - - outputPort.reportNewElement(); - - return true; - // return outputPort.send(element); - } - private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected @@ -74,21 +40,6 @@ public abstract class AbstractStage implements IStage { return this.cachedOutputPorts; } - @Override - public IStage getParentStage() { - return this.parentStage; - } - - @Override - public void setParentStage(final IStage parentStage, final int index) { - this.parentStage = parentStage; - } - - @Override - public String getId() { - return this.id; - } - /** * May not be invoked outside of IPipe implementations */ @@ -104,12 +55,12 @@ public abstract class AbstractStage implements IStage { } protected boolean alreadyVisited(final ISignal signal, final InputPort<?> inputPort) { - if (this.visited.containsKey(signal)) { + if (this.triggeredSignals.contains(signal)) { this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort); return true; } else { this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); - this.visited.put(signal, null); + this.triggeredSignals.add(signal); return false; } } @@ -158,11 +109,6 @@ public abstract class AbstractStage implements IStage { } } - @Override - public String toString() { - return this.getClass().getName() + ": " + this.id; - } - @Override public void terminate() { this.shouldTerminate = true; diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 2e115cdc1feaf8bf0ebe58fd05ebe1024511fbf1..fc3b26ae8a331ed3aebbce382dd2cac6a104dc6b 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -28,8 +28,8 @@ public class Analysis implements UncaughtExceptionHandler { } public void init() { - final List<IStage> threadableStageJobs = this.configuration.getThreadableStageJobs(); - for (IStage stage : threadableStageJobs) { + final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + for (Stage stage : threadableStageJobs) { final Thread thread = new Thread(new RunnableStage(stage)); switch (stage.getTerminationStrategy()) { case BY_SIGNAL: diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 3850987eb23573d05ae4781ff4eeb1ff24187e6f..c6a0436bc33783ac7d044c817729389d3a61fafe 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -8,15 +8,15 @@ import teetime.framework.pipe.PipeFactoryRegistry; public class AnalysisConfiguration { protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; - private final List<IStage> threadableStageJobs = new LinkedList<IStage>(); + private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); public AnalysisConfiguration() {} - List<IStage> getThreadableStageJobs() { + List<Stage> getThreadableStageJobs() { return this.threadableStageJobs; } - public void addThreadableStage(final IStage stage) { + public void addThreadableStage(final Stage stage) { this.threadableStageJobs.add(stage); } diff --git a/src/main/java/teetime/framework/IStage.java b/src/main/java/teetime/framework/IStage.java deleted file mode 100644 index 91fd7393da37758cdb1261f94efd3d99c7aa3664..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/IStage.java +++ /dev/null @@ -1,26 +0,0 @@ -package teetime.framework; - -import java.util.List; - -import teetime.framework.signal.ISignal; -import teetime.framework.validation.InvalidPortConnection; - -public interface IStage extends ITerminable { - - String getId(); - - void executeWithPorts(); - - IStage getParentStage(); - - void setParentStage(IStage parentStage, int index); - - void onSignal(ISignal signal, InputPort<?> inputPort); - - /** - * - * @param invalidPortConnections - * <i>(Passed as parameter for performance reasons)</i> - */ - void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections); -} diff --git a/src/main/java/teetime/framework/ITerminable.java b/src/main/java/teetime/framework/ITerminable.java deleted file mode 100644 index 175db53931772336b8e6e4a40060673b605082e3..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/ITerminable.java +++ /dev/null @@ -1,11 +0,0 @@ -package teetime.framework; - -interface ITerminable { - - TerminationStrategy getTerminationStrategy(); - - void terminate(); - - boolean shouldBeTerminated(); - -} diff --git a/src/main/java/teetime/framework/InputPort.java b/src/main/java/teetime/framework/InputPort.java index ccdd19d9bc378b6863290eb0e3a62e081c893c7c..62b14d9bf8452ad4ae7defc8bbfda1ba10abdf75 100644 --- a/src/main/java/teetime/framework/InputPort.java +++ b/src/main/java/teetime/framework/InputPort.java @@ -4,9 +4,9 @@ import teetime.framework.pipe.IPipe; public class InputPort<T> extends AbstractPort<T> { - private final IStage owningStage; + private final Stage owningStage; - InputPort(final IStage owningStage) { + InputPort(final Stage owningStage) { super(); this.owningStage = owningStage; } @@ -33,7 +33,7 @@ public class InputPort<T> extends AbstractPort<T> { this.pipe = pipe; } - public IStage getOwningStage() { + public Stage getOwningStage() { return this.owningStage; } diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index 45a8de638a765f72d408dc5e84f856feac624ab6..ca8f84affc5410b5123f2c982ee55dd4cd071d29 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -9,20 +9,22 @@ public final class OutputPort<T> extends AbstractPort<T> { } /** - * * @param element - * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) + * to be sent */ - public boolean send(final T element) { - return this.pipe.add(element); + public void send(final T element) { + if (this.pipe.add(element)) { + this.pipe.reportNewElement(); + } } + /** + * + * @param signal + * to be sent + */ public void sendSignal(final ISignal signal) { this.pipe.sendSignal(signal); } - public void reportNewElement() { - this.pipe.reportNewElement(); - } - } diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index b0d234bf016123f6c59d0a0d02ca5f068f022c74..89340d200883e727de82844f2d9bc6ff800de20f 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -8,14 +8,13 @@ import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; -@SuppressWarnings("PMD.BeanMembersShouldSerialize") public class RunnableStage implements Runnable { - private final IStage stage; + private final Stage stage; private final Logger logger; // NOPMD private boolean validationEnabled; - public RunnableStage(final IStage stage) { + public RunnableStage(final Stage stage) { this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); } @@ -47,9 +46,9 @@ public class RunnableStage implements Runnable { this.logger.error("Terminating thread due to the following exception: ", e); throw e; } // catch (RuntimeException e) { - // this.logger.error("Terminating thread due to the following exception: ", e); - // throw e; - // } + // this.logger.error("Terminating thread due to the following exception: ", e); + // throw e; + // } this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java new file mode 100644 index 0000000000000000000000000000000000000000..99f6c0b8d15139c066de47dd9c287e63dd797da9 --- /dev/null +++ b/src/main/java/teetime/framework/Stage.java @@ -0,0 +1,54 @@ +package teetime.framework; + +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; + +public abstract class Stage { + + private final String id; + /** + * A unique logger instance per stage instance + */ + protected final Logger logger; // NOPMD + + protected Stage() { + this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name + this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); + } + + public String getId() { + return this.id; + } + + @Override + public String toString() { + return this.getClass().getName() + ": " + this.getId(); + } + + // public abstract Stage getParentStage(); + // + // public abstract void setParentStage(Stage parentStage, int index); + + /** + * + * @param invalidPortConnections + * <i>(Passed as parameter for performance reasons)</i> + */ + public abstract void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections); + + protected abstract void executeWithPorts(); + + protected abstract void onSignal(ISignal signal, InputPort<?> inputPort); + + protected abstract TerminationStrategy getTerminationStrategy(); + + protected abstract void terminate(); + + protected abstract boolean shouldBeTerminated(); +} diff --git a/src/main/java/teetime/framework/pipe/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/pipe/AbstractIntraThreadPipe.java deleted file mode 100644 index b593b105add067a832a0e28dfc63a3816d730068..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/AbstractIntraThreadPipe.java +++ /dev/null @@ -1,26 +0,0 @@ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; -import teetime.framework.signal.ISignal; - -public abstract class AbstractIntraThreadPipe extends AbstractPipe { - - - <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - super(sourcePort, targetPort); - } - - @Override - public void sendSignal(final ISignal signal) { - if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts - this.cachedTargetStage.onSignal(signal, this.getTargetPort()); - } - } - - @Override - public final void reportNewElement() { - this.cachedTargetStage.executeWithPorts(); - } - -} diff --git a/src/main/java/teetime/framework/pipe/AbstractPipe.java b/src/main/java/teetime/framework/pipe/AbstractPipe.java index d3382dbc54193d873869dd41a338edc3dd852151..1790a4451479385452b89422bf43d75a5dfd9429 100644 --- a/src/main/java/teetime/framework/pipe/AbstractPipe.java +++ b/src/main/java/teetime/framework/pipe/AbstractPipe.java @@ -1,6 +1,6 @@ package teetime.framework.pipe; -import teetime.framework.IStage; +import teetime.framework.Stage; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -13,7 +13,7 @@ public abstract class AbstractPipe implements IPipe { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - protected IStage cachedTargetStage; + protected Stage cachedTargetStage; private InputPort<?> targetPort; diff --git a/src/main/java/teetime/framework/pipe/CommittablePipe.java b/src/main/java/teetime/framework/pipe/CommittablePipe.java index 41cea9411884e8b3e5d796f38bc0b5290c3d579a..f425bbb45d4e541cee6c186b33e67389f502f733 100644 --- a/src/main/java/teetime/framework/pipe/CommittablePipe.java +++ b/src/main/java/teetime/framework/pipe/CommittablePipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.util.list.CommittableResizableArrayQueue; diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java index eefa35be748a691469bb60562a37bcd23a5c8a46..d5db551a3dc5c8eafd1e8299d1152f6d7ea5b7a5 100644 --- a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.util.concurrent.workstealing.CircularArray; diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipeFactory.java index b67d7f0f521c5d948f27927ab4a500acd0152b73..a7a6151b33b890909db8a6d4957a1c3d25d4abe9 100644 --- a/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipeFactory.java +++ b/src/main/java/teetime/framework/pipe/OrderedGrowableArrayPipeFactory.java @@ -7,8 +7,6 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class OrderedGrowableArrayPipeFactory implements IPipeFactory { - public OrderedGrowableArrayPipeFactory() {} - @Override public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return this.create(sourcePort, targetPort, 4); diff --git a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java index 23b3d43f83a2507f7ec1c15998fa1ff90641987a..8901664518c63466d4f759e171e1d8afdb28967c 100644 --- a/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/framework/pipe/OrderedGrowablePipe.java @@ -2,6 +2,7 @@ package teetime.framework.pipe; import java.util.LinkedList; +import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; diff --git a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java index e58d60083aa7f4c2eb4f00de7a79c552df5b56bb..6fcbda81d8ff073ca27a9d2863b4ec02ad6f4688 100644 --- a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java +++ b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java @@ -12,7 +12,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.FileSearcher; +import teetime.util.classpath.FileSearcher; public final class PipeFactoryLoader { diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 3a7bc13c99fa68e1660efcd5a4d034fe50988c82..ff09a32a51931f62ddb2772ea765668c4cb2d551 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import teetime.framework.AbstractInterThreadPipe; import teetime.util.ConstructorClosure; public final class RelayTestPipe<T> extends AbstractInterThreadPipe { diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java index f71d3957c09f96baee6574dcb07c31568de4b009..cc519a2b4402e1049af59ce1d249b04f6563f918 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java index d888e4c0239eb6c7f4c2b6be8ea383f744d52176..095d9c6efa8dba8c78e41b03a2231eb4ef9598ce 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java @@ -7,15 +7,15 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class SingleElementPipeFactory implements IPipeFactory { - public SingleElementPipeFactory() {} - @Override public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return this.create(sourcePort, targetPort, 1); } /** - * Hint: The capacity for this pipe implementation is ignored + * Hint: The capacity for this pipe implementation is ignored. + * <p> + * {@inheritDoc} */ @Override public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 4941e4a7dca7c10e16019332d9f73fb801fd5199..8facbf44a72c4aa96087f8fe2372d54a6c7d7a7f 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -7,6 +7,7 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; +import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; diff --git a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java index e5e76c573f85b3d306e211635d4b9d9b0f51963d..9c7b9bd7c5e42305033e792d732622ed2cbd7cf4 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java +++ b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java @@ -7,8 +7,6 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class SpScPipeFactory implements IPipeFactory { - public SpScPipeFactory() {} - @Override public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return this.create(sourcePort, targetPort, 4); diff --git a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java index 76075d7c64ab20805ad540bfa3a0225610b2b7c4..6fb7e9a12b0c2487cf49f34ee972af63a46c9958 100644 --- a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java @@ -1,5 +1,6 @@ package teetime.framework.pipe; +import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; diff --git a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipeFactory.java index 38e1d6692cbb91ea34b0cf97b96bd3ee9ea9a170..159b108e18bc7505b2a1a86c8ccc63ba05fd32f8 100644 --- a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipeFactory.java +++ b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipeFactory.java @@ -7,8 +7,6 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; public class UnorderedGrowablePipeFactory implements IPipeFactory { - public UnorderedGrowablePipeFactory() {} - @Override public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return this.create(sourcePort, targetPort, 4); diff --git a/src/main/java/teetime/stage/ByteArray2String.java b/src/main/java/teetime/stage/ByteArray2String.java index eb27f89b5cd68cbac09ef5cf6a9ed806cf5205d8..aca5a438e085a277ea780f709695104387e25f0e 100644 --- a/src/main/java/teetime/stage/ByteArray2String.java +++ b/src/main/java/teetime/stage/ByteArray2String.java @@ -11,7 +11,7 @@ public class ByteArray2String extends AbstractConsumerStage<byte[]> { @Override protected void execute(final byte[] element) { - this.send(this.outputPort, new String(element, Charset.forName("UTF-8"))); + outputPort.send(new String(element, Charset.forName("UTF-8"))); } public OutputPort<? extends String> getOutputPort() { diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index df63a6232abbbb6963f17761870ca9bd0901a236..a5991589b69f63ec746ea15e3dc09d021a51be61 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -25,7 +25,7 @@ public class Cache<T> extends AbstractConsumerStage<T> { StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (T cachedElement : this.cachedObjects) { - this.send(this.outputPort, cachedElement); + outputPort.send(cachedElement); } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); diff --git a/src/main/java/teetime/stage/CipherByteArray.java b/src/main/java/teetime/stage/CipherByteArray.java index 62d8db9907bef8f6e7fe470358f04fcf7a55c910..f91682f169581d321f96ba6b96292638698546c6 100644 --- a/src/main/java/teetime/stage/CipherByteArray.java +++ b/src/main/java/teetime/stage/CipherByteArray.java @@ -75,7 +75,7 @@ public class CipherByteArray extends AbstractConsumerStage<byte[]> { e.printStackTrace(); } - this.send(this.outputPort, output); + outputPort.send(output); } public OutputPort<? extends byte[]> getOutputPort() { diff --git a/src/main/java/teetime/stage/Clock.java b/src/main/java/teetime/stage/Clock.java index a536bc90803d931da45d499a864a422d4f286bd9..96a9544184231de7248151b896d6c5a9ead78750 100644 --- a/src/main/java/teetime/stage/Clock.java +++ b/src/main/java/teetime/stage/Clock.java @@ -25,7 +25,7 @@ public class Clock extends AbstractProducerStage<Long> { } // this.logger.debug("Emitting timestamp"); - this.send(this.outputPort, this.getCurrentTimeInNs()); + outputPort.send(this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 6797fbde77b7182bcca65ed5ef9d91379e073474..eb4f3a0c5cb4aba15ce875f464499e5d28af8f42 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -15,6 +15,7 @@ ***************************************************************************/ package teetime.stage; +import java.util.ArrayList; import java.util.List; import teetime.framework.AbstractConsumerStage; @@ -22,45 +23,26 @@ import teetime.framework.AbstractConsumerStage; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ -public class CollectorSink<T> extends AbstractConsumerStage<T> { - - // private final InputPort<T> inputPort = this.createInputPort(); - // - // public final InputPort<T> getInputPort() { - // return this.inputPort; - // } +public final class CollectorSink<T> extends AbstractConsumerStage<T> { private final List<T> elements; - private final int threshold; - public CollectorSink(final List<T> list, final int threshold) { - this.elements = list; - this.threshold = threshold; + /** + * Creates a new {@link CollectorSink} with an {@link ArrayList}. + */ + public CollectorSink() { + this(new ArrayList<T>()); } public CollectorSink(final List<T> list) { - this(list, 100000); - } - - @Override - public void onTerminating() throws Exception { - System.out.println("size: " + this.elements.size()); - super.onTerminating(); + this.elements = list; } @Override protected void execute(final T element) { this.elements.add(element); - - if ((this.elements.size() % this.threshold) == 0) { - System.out.println("size: " + this.elements.size()); - } - - // if (this.elements.size() > 90000) { - // // System.out.println("size > 90000: " + this.elements.size()); - // } } } diff --git a/src/main/java/teetime/stage/Counter.java b/src/main/java/teetime/stage/Counter.java index 1dc0ccd2ec6e0d64f95f5811aad16dcabe81bd47..0a9b1c88b65d0a480dbb572b4b9263f3bde7a30d 100644 --- a/src/main/java/teetime/stage/Counter.java +++ b/src/main/java/teetime/stage/Counter.java @@ -12,8 +12,8 @@ public class Counter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { this.numElementsPassed++; - // this.logger.debug("count: " + this.numElementsPassed); - this.send(this.outputPort, element); + + outputPort.send(element); } // BETTER find a solution w/o any thread-safe code in this stage diff --git a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java index 16ae63749b53654135f4613f66e4b9e1213134cd..5ba5719466bbd1e1a06bf7c4aa97ca8b11e0e622 100644 --- a/src/main/java/teetime/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementDelayMeasuringStage.java @@ -25,7 +25,7 @@ public class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T> { } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java index de5958b14cea672b878afd2424e72404284cc036..fe3821c7e2b6bc5dfd83aa5e452a635f68f6cb83 100644 --- a/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/stage/ElementThroughputMeasuringStage.java @@ -26,7 +26,7 @@ public class ElementThroughputMeasuringStage<T> extends AbstractConsumerStage<T> } this.numPassedElements++; - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/EveryXthStage.java b/src/main/java/teetime/stage/EveryXthStage.java new file mode 100644 index 0000000000000000000000000000000000000000..4f3ff730d9f88e2e8be32555fc0fe01189ac3c4c --- /dev/null +++ b/src/main/java/teetime/stage/EveryXthStage.java @@ -0,0 +1,30 @@ +package teetime.stage; + +import teetime.framework.AbstractConsumerStage; +import teetime.framework.OutputPort; + +public final class EveryXthStage<T> extends AbstractConsumerStage<T> { + + private final OutputPort<Integer> outputPort = createOutputPort(); + + private final int threshold; + + private int counter; + + public EveryXthStage(final int threshold) { + this.threshold = threshold; + } + + @Override + protected void execute(final T element) { + counter++; + if (counter % threshold == 0) { + outputPort.send(Integer.valueOf(counter)); + } + } + + public OutputPort<Integer> getOutputPort() { + return outputPort; + } + +} diff --git a/src/main/java/teetime/stage/FileExtensionSwitch.java b/src/main/java/teetime/stage/FileExtensionSwitch.java index 1ab666f26b1156c94d66ca6bd75072080d51549a..5099b25adccfb8054b28ff8ad32add3daa6787f1 100644 --- a/src/main/java/teetime/stage/FileExtensionSwitch.java +++ b/src/main/java/teetime/stage/FileExtensionSwitch.java @@ -1,26 +1,36 @@ package teetime.stage; import java.io.File; -import java.util.HashMap; import java.util.Map; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; +import teetime.util.HashMapWithDefault; +import teetime.util.concurrent.hashmap.ValueFactory; import com.google.common.io.Files; -public class FileExtensionSwitch extends AbstractConsumerStage<File> { +public final class FileExtensionSwitch extends AbstractConsumerStage<File> { - private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>(); + private final OutputPort<File> unknownFileExtensionOutputPort = createOutputPort(); + + // BETTER use the hppc ObjectObjectMap that provide getOrDefault() + private final Map<String, OutputPort<File>> fileExtensions = new HashMapWithDefault<String, OutputPort<File>>(new ValueFactory<OutputPort<File>>() { + @Override + public OutputPort<File> create() { + return unknownFileExtensionOutputPort; + } + }); @Override protected void execute(final File file) { String fileExtension = Files.getFileExtension(file.getAbsolutePath()); - this.logger.debug("fileExtension: " + fileExtension); - OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); - if (outputPort != null) { - this.send(outputPort, file); + if (logger.isDebugEnabled()) { + this.logger.debug("fileExtension: " + fileExtension); } + + OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); + outputPort.send(file); } public OutputPort<File> addFileExtension(String fileExtension) { diff --git a/src/main/java/teetime/stage/InitialElementProducer.java b/src/main/java/teetime/stage/InitialElementProducer.java index 1b167e481ee5ec46a2a7b768f3be705c268ea768..12af6bf098280e0b1d1d6ffd88e30e3eadec7df9 100644 --- a/src/main/java/teetime/stage/InitialElementProducer.java +++ b/src/main/java/teetime/stage/InitialElementProducer.java @@ -13,7 +13,7 @@ public class InitialElementProducer<T> extends AbstractProducerStage<T> { @Override protected void execute() { for (T e : this.elements) { - this.send(this.outputPort, e); + outputPort.send(e); } this.terminate(); } diff --git a/src/main/java/teetime/stage/InstanceCounter.java b/src/main/java/teetime/stage/InstanceCounter.java index 9db60208c598f41881a99adf38134c07479a9585..16a57baa62f78556fe13dad0427eec1aa4770818 100644 --- a/src/main/java/teetime/stage/InstanceCounter.java +++ b/src/main/java/teetime/stage/InstanceCounter.java @@ -3,7 +3,7 @@ package teetime.stage; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -public class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { +public final class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { private final OutputPort<T> outputPort = this.createOutputPort(); @@ -20,7 +20,7 @@ public class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { this.counter++; } - this.send(this.outputPort, element); + outputPort.send(element); } public int getCounter() { diff --git a/src/main/java/teetime/stage/InstanceOfFilter.java b/src/main/java/teetime/stage/InstanceOfFilter.java index d2e381c078cdc427b62a3ba3803316b9883616e2..710ea79f2b42ee100ecb468a2388346c6c384066 100644 --- a/src/main/java/teetime/stage/InstanceOfFilter.java +++ b/src/main/java/teetime/stage/InstanceOfFilter.java @@ -5,9 +5,9 @@ import teetime.framework.OutputPort; /** * @author Jan Waller, Nils Christian Ehmke, Christian Wulf - * + * */ -public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { +public final class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { private final OutputPort<O> outputPort = this.createOutputPort(); @@ -21,7 +21,7 @@ public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { @Override protected void execute(final I element) { if (this.type.isInstance(element)) { - this.send(this.outputPort, (O) element); + outputPort.send((O) element); } else { // swallow up the element if (this.logger.isDebugEnabled()) { this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); diff --git a/src/main/java/teetime/stage/IterableProducer.java b/src/main/java/teetime/stage/IterableProducer.java index 398d6e6158107205278bd1c69c0b127459525afe..b66245ca71111456570c9ee8db6c3f0250ef2d9f 100644 --- a/src/main/java/teetime/stage/IterableProducer.java +++ b/src/main/java/teetime/stage/IterableProducer.java @@ -2,7 +2,7 @@ package teetime.stage; import teetime.framework.AbstractProducerStage; -public class IterableProducer<O extends Iterable<T>, T> extends AbstractProducerStage<T> { +public final class IterableProducer<O extends Iterable<T>, T> extends AbstractProducerStage<T> { private O iter = null; @@ -13,7 +13,7 @@ public class IterableProducer<O extends Iterable<T>, T> extends AbstractProducer @Override protected void execute() { for (T i : iter) { - this.send(this.outputPort, i); + outputPort.send(i); } } diff --git a/src/main/java/teetime/stage/NoopFilter.java b/src/main/java/teetime/stage/NoopFilter.java index a023df02b285871aeccc90da87716db49feb1a29..1b777bf0d01a06235056ab23e2b217416efe0b01 100644 --- a/src/main/java/teetime/stage/NoopFilter.java +++ b/src/main/java/teetime/stage/NoopFilter.java @@ -21,15 +21,15 @@ import teetime.framework.OutputPort; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ -public class NoopFilter<T> extends AbstractConsumerStage<T> { +public final class NoopFilter<T> extends AbstractConsumerStage<T> { private final OutputPort<T> outputPort = this.createOutputPort(); @Override protected void execute(final T element) { - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/ObjectProducer.java b/src/main/java/teetime/stage/ObjectProducer.java index 7b9b76d575e9cb82c8a92e5ca07ecbb926b468b4..b4969d4ad4e42e7fd803d4d45f86b3713959f494 100644 --- a/src/main/java/teetime/stage/ObjectProducer.java +++ b/src/main/java/teetime/stage/ObjectProducer.java @@ -21,15 +21,15 @@ import teetime.util.ConstructorClosure; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ -public class ObjectProducer<T> extends AbstractProducerStage<T> { +public final class ObjectProducer<T> extends AbstractProducerStage<T> { private long numInputObjects; private ConstructorClosure<T> inputObjectCreator; /** - * @since 1.10 + * @since 1.0 */ public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) { this.numInputObjects = numInputObjects; @@ -57,7 +57,7 @@ public class ObjectProducer<T> extends AbstractProducerStage<T> { T newObject = this.inputObjectCreator.create(); this.numInputObjects--; - this.send(this.outputPort, newObject); + outputPort.send(newObject); if (this.numInputObjects == 0) { this.terminate(); diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 0d898b3d4bd305bbe8b08f7bfe5cbf77cf3ecf3f..96e303348613094e57ddec5cd177c22bdf4ecdde 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -1,11 +1,11 @@ package teetime.stage; -import teetime.framework.InputPort; +import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractProducerStage; -import teetime.framework.pipe.AbstractInterThreadPipe; +import teetime.framework.InputPort; import teetime.framework.signal.TerminatingSignal; -public class Relay<T> extends AbstractProducerStage<T> { +public final class Relay<T> extends AbstractProducerStage<T> { private final InputPort<T> inputPort = this.createInputPort(); @@ -21,7 +21,7 @@ public class Relay<T> extends AbstractProducerStage<T> { Thread.yield(); return; } - this.send(this.outputPort, element); + outputPort.send(element); } @Override diff --git a/src/main/java/teetime/stage/StartTimestampFilter.java b/src/main/java/teetime/stage/StartTimestampFilter.java index 4f5b50eaa14c4fb98ad8a494c41d6a558f82f59b..a229af1656f23e6229fec3b008aff91d2c067b2a 100644 --- a/src/main/java/teetime/stage/StartTimestampFilter.java +++ b/src/main/java/teetime/stage/StartTimestampFilter.java @@ -24,14 +24,14 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> { +public final class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> { private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); @Override protected void execute(final TimestampObject element) { element.setStartTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/StopTimestampFilter.java b/src/main/java/teetime/stage/StopTimestampFilter.java index 49385bf2e23924c8cea16d268d78e8eedf8fdfeb..253ea8b0fb872049cd13d5b8b2f75cc86fcddd15 100644 --- a/src/main/java/teetime/stage/StopTimestampFilter.java +++ b/src/main/java/teetime/stage/StopTimestampFilter.java @@ -24,14 +24,14 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> { +public final class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> { private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); @Override protected void execute(final TimestampObject element) { element.setStopTimestamp(System.nanoTime()); - this.send(this.outputPort, element); + outputPort.send(element); } public OutputPort<TimestampObject> getOutputPort() { diff --git a/src/main/java/teetime/stage/ZipByteArray.java b/src/main/java/teetime/stage/ZipByteArray.java index 36dc9dbef3e3db0328e9b77f07a4230f305b8ca4..8b251ce669d61a63d0293e1edebdf82ec5bcb174 100644 --- a/src/main/java/teetime/stage/ZipByteArray.java +++ b/src/main/java/teetime/stage/ZipByteArray.java @@ -15,7 +15,7 @@ import teetime.framework.OutputPort; * @author Nelson Tavares de Sousa * */ -public class ZipByteArray extends AbstractConsumerStage<byte[]> { +public final class ZipByteArray extends AbstractConsumerStage<byte[]> { private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final ZipMode mode; @@ -40,7 +40,7 @@ public class ZipByteArray extends AbstractConsumerStage<byte[]> { } catch (Exception e) { e.printStackTrace(); } - this.send(this.outputPort, cache); + outputPort.send(cache); } private byte[] compress(final byte[] data) throws IOException { diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 4682ba64fe7a9afa2b6258fc132a090c0ae1c461..29f152d0fd8589b1d5e75da901f3269baef878c1 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -7,7 +7,7 @@ import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public class Delay<T> extends AbstractStage { +public final class Delay<T> extends AbstractStage { private final InputPort<T> inputPort = this.createInputPort(); private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); @@ -29,7 +29,7 @@ public class Delay<T> extends AbstractStage { while (!bufferedElements.isEmpty()) { element = bufferedElements.remove(0); - this.send(this.outputPort, element); + outputPort.send(element); } } diff --git a/src/main/java/teetime/stage/basic/Sink.java b/src/main/java/teetime/stage/basic/Sink.java index 1b790990958c02d1be79be5c460bdb39d7f838e4..87ee070d18dc732150a6ef9cd8e4c2e1abff9002 100644 --- a/src/main/java/teetime/stage/basic/Sink.java +++ b/src/main/java/teetime/stage/basic/Sink.java @@ -2,7 +2,7 @@ package teetime.stage.basic; import teetime.framework.AbstractConsumerStage; -public class Sink<T> extends AbstractConsumerStage<T> { +public final class Sink<T> extends AbstractConsumerStage<T> { // PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java index d5f2697228bbfc7851057eae82e24fa71fe92bcb..3a31976eac4d3831fce0230c543e27631625ed39 100644 --- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java @@ -26,8 +26,8 @@ public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> @Override public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { - for (final OutputPort<T> port : outputPorts) { - port.send(element); + for (final OutputPort<T> outputPort : outputPorts) { + outputPort.send(element); } return true; diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index 1f2ed53e5accb5e9bf654b3a4084b68ae3d59875..5fecfe44430510cf97bfdd17097bd6cee527bc84 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -22,7 +22,7 @@ import teetime.framework.OutputPort; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 * * @param T * the type of the input port and the output ports diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 1ea6e2a1987d9ef99c19c0f4b68a0d6a84810519..69b778a4751b8777cf9fab2656972b7af4c4daff 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -17,7 +17,9 @@ package teetime.stage.basic.merger; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import teetime.framework.AbstractStage; import teetime.framework.InputPort; @@ -27,6 +29,7 @@ import teetime.framework.signal.ISignal; /** * * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. + * For its signal handling behavior see {@link #onSignal(ISignal, InputPort)} * * @author Christian Wulf * @@ -35,13 +38,13 @@ import teetime.framework.signal.ISignal; * @param <T> * the type of both the input and output ports */ -public class Merger<T> extends AbstractStage { +public final class Merger<T> extends AbstractStage { private final OutputPort<T> outputPort = this.createOutputPort(); private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); - private final Map<Class<?>, Integer> signalMap = new HashMap<Class<?>, Integer>(); + private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>(); @Override public void executeWithPorts() { @@ -50,25 +53,39 @@ public class Merger<T> extends AbstractStage { return; } - this.send(this.outputPort, token); + outputPort.send(token); } + /** + * This method is executed, if a signal is sent to a instance of this class. + * Multiple signals of one certain type are ignored, if they are sent to same port. + * Hence a signal is only passed on, when it arrived on all input ports, regardless how often. + * + * @param signal + * Signal which is sent + * + * @param inputPort + * The port which the signal was sent to + */ @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); if (signalMap.containsKey(signal.getClass())) { - int value = signalMap.get(signal.getClass()); - value++; - if (value == this.getInputPorts().length) { + Set<InputPort<?>> set = signalMap.get(signal.getClass()); + if (!set.add(inputPort)) { + this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); + } + + if (set.size() == this.getInputPorts().length) { this.outputPort.sendSignal(signal); signalMap.remove(signal.getClass()); - } else { - signalMap.put(signal.getClass(), value); } } else { signal.trigger(this); - signalMap.put(signal.getClass(), 1); + Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); + tempSet.add(inputPort); + signalMap.put(signal.getClass(), tempSet); } } diff --git a/src/main/java/teetime/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/stage/io/Directory2FilesFilter.java index 426cad8c76d5c88ee51233c09bf0c39668c76580..4fe0d8e3c29ad159df974edbe152b771bd0a74b1 100644 --- a/src/main/java/teetime/stage/io/Directory2FilesFilter.java +++ b/src/main/java/teetime/stage/io/Directory2FilesFilter.java @@ -79,7 +79,7 @@ public class Directory2FilesFilter extends AbstractConsumerStage<File> { } for (final File file : inputFiles) { - this.send(this.outputPort, file); + outputPort.send(file); } } diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java new file mode 100644 index 0000000000000000000000000000000000000000..1af922a1521d94f5db344d878bf751771c461aa1 --- /dev/null +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -0,0 +1,73 @@ +package teetime.stage.io; + +import java.util.List; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.Stage; +import teetime.framework.TerminationStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; +import teetime.stage.EveryXthStage; +import teetime.stage.basic.distributor.CopyByReferenceStrategy; +import teetime.stage.basic.distributor.Distributor; + +public final class EveryXthPrinter<T> extends Stage { + + private final Distributor<T> distributor; + + public EveryXthPrinter(final int threshold) { + distributor = new Distributor<T>(); + EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); + Printer<Integer> printer = new Printer<Integer>(); + + IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); + pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); + + distributor.setStrategy(new CopyByReferenceStrategy<T>()); + } + + @Override + protected void executeWithPorts() { + distributor.executeWithPorts(); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + distributor.validateOutputPorts(invalidPortConnections); + } + + @Override + protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { + distributor.onSignal(signal, inputPort); + } + + @Override + protected TerminationStrategy getTerminationStrategy() { + return distributor.getTerminationStrategy(); + } + + @Override + protected void terminate() { + distributor.terminate(); + } + + @Override + protected boolean shouldBeTerminated() { + return distributor.shouldBeTerminated(); + } + + public InputPort<T> getInputPort() { + return distributor.getInputPort(); + } + + public OutputPort<T> getNewOutputPort() { + return distributor.getNewOutputPort(); + } + +} diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java index 647b42ae7731219cf97bf732064c0238e94c675d..1319b4fb6959e38513ba3c8e101174fa708f6f41 100644 --- a/src/main/java/teetime/stage/io/File2ByteArray.java +++ b/src/main/java/teetime/stage/io/File2ByteArray.java @@ -5,11 +5,10 @@ import java.io.IOException; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -import teetime.framework.IStage; import com.google.common.io.Files; -public class File2ByteArray extends AbstractConsumerStage<File> implements IStage { +public class File2ByteArray extends AbstractConsumerStage<File> { private final OutputPort<byte[]> outputPort = this.createOutputPort(); @@ -17,7 +16,7 @@ public class File2ByteArray extends AbstractConsumerStage<File> implements IStag protected void execute(final File element) { try { byte[] cache = Files.toByteArray(element); - this.send(this.outputPort, cache); + outputPort.send(cache); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/src/main/java/teetime/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/stage/io/File2TextLinesFilter.java index 7acd02c9ab69a107f4b7c2d687bf501bb65ca157..329d759408ca055fb35c608d05afe9a6746bb617 100644 --- a/src/main/java/teetime/stage/io/File2TextLinesFilter.java +++ b/src/main/java/teetime/stage/io/File2TextLinesFilter.java @@ -46,7 +46,7 @@ public class File2TextLinesFilter extends AbstractConsumerStage<File> { while ((line = reader.readLine()) != null) { line = line.trim(); if (line.length() != 0) { - this.send(this.outputPort, new TextLine(textFile, line)); + outputPort.send(new TextLine(textFile, line)); } // else: ignore empty line } } catch (final FileNotFoundException e) { diff --git a/src/main/java/teetime/stage/Tokenizer.java b/src/main/java/teetime/stage/string/Tokenizer.java similarity index 80% rename from src/main/java/teetime/stage/Tokenizer.java rename to src/main/java/teetime/stage/string/Tokenizer.java index 96064e6e0f5009bd6512c13714e86390675af7fc..812dedf4edeba7a0eaf0ecbd76ce95600aab6819 100644 --- a/src/main/java/teetime/stage/Tokenizer.java +++ b/src/main/java/teetime/stage/string/Tokenizer.java @@ -1,11 +1,11 @@ -package teetime.stage; +package teetime.stage.string; import java.util.StringTokenizer; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -public class Tokenizer extends AbstractConsumerStage<String> { +public final class Tokenizer extends AbstractConsumerStage<String> { private final OutputPort<String> outputPort = this.createOutputPort(); private final String regex; @@ -18,7 +18,7 @@ public class Tokenizer extends AbstractConsumerStage<String> { protected void execute(final String element) { StringTokenizer st = new StringTokenizer(element, this.regex); while (st.hasMoreTokens()) { - this.send(this.outputPort, st.nextToken()); + outputPort.send(st.nextToken()); } } diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/string/buffer/StringBufferFilter.java similarity index 90% rename from src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java rename to src/main/java/teetime/stage/string/buffer/StringBufferFilter.java index 73a8e99c03d1e978a1a76df9851b379304872d70..6b20b26f189c2fc6ae1f55d590b8d5b5545bbeac 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/string/buffer/StringBufferFilter.java @@ -13,22 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.stage.stringBuffer; +package teetime.stage.string.buffer; import java.util.Collection; import java.util.LinkedList; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; -import teetime.stage.stringBuffer.util.KiekerHashMap; +import teetime.stage.string.buffer.handler.AbstractDataTypeHandler; +import teetime.stage.string.buffer.util.KiekerHashMap; /** * @author Christian Wulf * * @since 1.10 */ -public class StringBufferFilter<T> extends AbstractConsumerStage<T> { +public final class StringBufferFilter<T> extends AbstractConsumerStage<T> { private final OutputPort<T> outputPort = this.createOutputPort(); @@ -40,7 +40,7 @@ public class StringBufferFilter<T> extends AbstractConsumerStage<T> { @Override protected void execute(final T element) { final T returnedElement = this.handle(element); - this.send(this.outputPort, returnedElement); + outputPort.send(returnedElement); } @Override diff --git a/src/main/java/teetime/stage/stringBuffer/handler/AbstractDataTypeHandler.java b/src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java similarity index 93% rename from src/main/java/teetime/stage/stringBuffer/handler/AbstractDataTypeHandler.java rename to src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java index f58688df027c7ab160fabf1a55ddb1f708e161b0..da82bc87a8adbb3625bf830e04196550194d24a1 100644 --- a/src/main/java/teetime/stage/stringBuffer/handler/AbstractDataTypeHandler.java +++ b/src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java @@ -13,11 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.stage.stringBuffer.handler; +package teetime.stage.string.buffer.handler; import org.slf4j.Logger; -import teetime.stage.stringBuffer.util.KiekerHashMap; +import teetime.stage.string.buffer.util.KiekerHashMap; /** * @author Christian Wulf diff --git a/src/main/java/teetime/stage/stringBuffer/handler/StringHandler.java b/src/main/java/teetime/stage/string/buffer/handler/StringHandler.java similarity index 96% rename from src/main/java/teetime/stage/stringBuffer/handler/StringHandler.java rename to src/main/java/teetime/stage/string/buffer/handler/StringHandler.java index 82c356faf7727ec9d4b1521a9aceb8594837afca..83e28fa621ad595a9addca807a5a38cdd7f81c05 100644 --- a/src/main/java/teetime/stage/stringBuffer/handler/StringHandler.java +++ b/src/main/java/teetime/stage/string/buffer/handler/StringHandler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.stage.stringBuffer.handler; +package teetime.stage.string.buffer.handler; /** * @author Christian Wulf diff --git a/src/main/java/teetime/stage/stringBuffer/util/KiekerHashMap.java b/src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java similarity index 99% rename from src/main/java/teetime/stage/stringBuffer/util/KiekerHashMap.java rename to src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java index 82667eed1ccd4e0a0579a55d2587391f9d1dbadb..259265abc4d2b84b560ffff2215b9c9ed4f1cad1 100644 --- a/src/main/java/teetime/stage/stringBuffer/util/KiekerHashMap.java +++ b/src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.stage.stringBuffer.util; +package teetime.stage.string.buffer.util; import java.lang.ref.SoftReference; import java.util.concurrent.locks.ReentrantLock; diff --git a/src/main/java/teetime/framework/FileSearcher.java b/src/main/java/teetime/util/classpath/FileSearcher.java similarity index 95% rename from src/main/java/teetime/framework/FileSearcher.java rename to src/main/java/teetime/util/classpath/FileSearcher.java index 5902ba89fd2b954f4106d4277ad7dec62b54577d..7476a9b2101a1ec36e8c28dabaa73678925dd24c 100644 --- a/src/main/java/teetime/framework/FileSearcher.java +++ b/src/main/java/teetime/util/classpath/FileSearcher.java @@ -1,4 +1,4 @@ -package teetime.framework; +package teetime.util.classpath; import java.io.IOException; import java.net.URL; diff --git a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java index d8929c3e9046d5a2321a078c50c2c46f3e41aaa3..2e31a320783e77547945c49329c6f6f8e5a5ac16 100644 --- a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java +++ b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java @@ -81,11 +81,11 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> { this.replaceCurrentArrayBy(newElements); } - private void shrink() { - T[] newElements = this.arrayPool.acquire(this.elements.length / 2); - // System.out.println("shrink: " + this.lastFreeIndexUncommitted); - this.replaceCurrentArrayBy(newElements); - } + // private void shrink() { + // T[] newElements = this.arrayPool.acquire(this.elements.length / 2); + // // System.out.println("shrink: " + this.lastFreeIndexUncommitted); + // this.replaceCurrentArrayBy(newElements); + // } private final void replaceCurrentArrayBy(final T[] newElements) { this.copyArray(this.elements, newElements); diff --git a/src/main/java/util/test/PerformanceCheckProfileRepository.java b/src/main/java/util/test/PerformanceCheckProfileRepository.java index d16171a6a890c0aa155679a4e67497311e34ae78..603587585034c4b8453d14f77935c6dd271331c6 100644 --- a/src/main/java/util/test/PerformanceCheckProfileRepository.java +++ b/src/main/java/util/test/PerformanceCheckProfileRepository.java @@ -8,12 +8,11 @@ import org.slf4j.LoggerFactory; public class PerformanceCheckProfileRepository { - public static final PerformanceCheckProfileRepository INSTANCE = new PerformanceCheckProfileRepository(); - private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceCheckProfileRepository.class); - private final Map<Class<?>, AbstractProfiledPerformanceAssertion> performanceCheckProfiles = new HashMap<Class<?>, AbstractProfiledPerformanceAssertion>(); + public static final PerformanceCheckProfileRepository INSTANCE = new PerformanceCheckProfileRepository(); + private final Map<Class<?>, AbstractProfiledPerformanceAssertion> performanceCheckProfiles = new HashMap<Class<?>, AbstractProfiledPerformanceAssertion>(); private String currentProfile; public PerformanceCheckProfileRepository() { diff --git a/src/main/java/util/test/PerformanceTest.java b/src/main/java/util/test/PerformanceTest.java index 66c435b8c4e76b53ebbfd6a89750380decb05724..031b4f62052ce12de8d16c146c461c4926abdbcc 100644 --- a/src/main/java/util/test/PerformanceTest.java +++ b/src/main/java/util/test/PerformanceTest.java @@ -28,7 +28,7 @@ public abstract class PerformanceTest { protected List<TimestampObject> timestampObjects; static { - System.setProperty("logback.configurationFile", "src/test/resources/logback-test.groovy"); + System.setProperty("logback.configurationFile", "src/test/resources/logback.groovy"); } @Rule diff --git a/src/main/resources/logback.groovy b/src/main/resources/logback.groovy index ab2f6629fddcb6776cdab7e2ff1ed23aeb391d85..371a869615c1d9ebe0097d7e8f02f5bb597806d2 100644 --- a/src/main/resources/logback.groovy +++ b/src/main/resources/logback.groovy @@ -3,7 +3,7 @@ import ch.qos.logback.classic.filter.ThresholdFilter statusListener(OnConsoleStatusListener) appender("FILE", FileAppender) { - file = "src/test/data/load-logs/timings-results.txt" + file = "teetime.log" append = false filter(ThresholdFilter) { level = INFO diff --git a/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java index b7e1ed78b3fa6231509cb388c79eb0c062928b19..3d45a40e9e8e9e2c99223cccb775ec4bdd294ef3 100644 --- a/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java @@ -15,8 +15,8 @@ import teetime.examples.experiment15.MethodCallThoughputTimestampAnalysis15Test; import teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test; import teetime.examples.experiment17.MethodCallThoughputTimestampAnalysis17Test; import teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test; -import util.test.PerformanceCheckProfileRepository; import util.test.AbstractProfiledPerformanceAssertion; +import util.test.PerformanceCheckProfileRepository; @RunWith(Suite.class) @SuiteClasses({ @@ -34,7 +34,7 @@ public class ComparisonMethodcallWithPorts { @BeforeClass public static void beforeClass() { - System.setProperty("logback.configurationFile", "src/test/resources/logback-test.groovy"); + System.setProperty("logback.configurationFile", "src/test/resources/logback.groovy"); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwWorkComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwHomeComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new NieWorkComparisonMethodcallWithPorts()); diff --git a/src/performancetest/java/teetime/examples/experiment09/ChwHomePerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment09/ChwHomePerformanceCheck.java index 4ea53153b9f60c019437a9a90a6ac056121eb177..a1f37f2cb557f85656e5ca3a377b1a092c744b72 100644 --- a/src/performancetest/java/teetime/examples/experiment09/ChwHomePerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment09/ChwHomePerformanceCheck.java @@ -28,6 +28,8 @@ class ChwHomePerformanceCheck extends AbstractPerformanceCheck { // since 31.08.2014 (incl.) // assertEquals(44, medianSpeedup, 2.1); // +2 // since 04.11.2014 (incl.) - assertEquals(71, medianSpeedup, 2.1); // +33 + // assertEquals(71, medianSpeedup, 2.1); // +33 + // since 05.12.2014 (incl.) + assertEquals(45, medianSpeedup, 2.1); // -26 } } diff --git a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java index dfdbfbd6df8aba627cc2e18fd5a95b6a0c01d50e..e5e38cab4f897c29787c68774b51c24d8f2d5b58 100644 --- a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -17,9 +17,9 @@ package teetime.examples.experiment09; import java.util.List; +import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; -import teetime.framework.IStage; import teetime.framework.pipe.CommittablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 { private Runnable runnable; public void init() { - IStage pipeline = this.buildPipeline(); + Stage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java index d95d9f923bd66ce2326846865c9d9dde9369bf62..ac70216dab166fb436ea93937ff58ba5f9a035f7 100644 --- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -17,9 +17,9 @@ package teetime.examples.experiment11; import java.util.List; +import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; -import teetime.framework.IStage; import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 { private Runnable runnable; public void init() { - IStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); + Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment14/ChwHomePerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment14/ChwHomePerformanceCheck.java index e0b3ec95a3f3300a43f7afaea04b3a851e0f46a1..17cf68702214d3750685c595387703aef6bf2dc1 100644 --- a/src/performancetest/java/teetime/examples/experiment14/ChwHomePerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment14/ChwHomePerformanceCheck.java @@ -28,6 +28,8 @@ class ChwHomePerformanceCheck extends AbstractPerformanceCheck { // since 31.08.2014 (incl.) // assertEquals(62, medianSpeedup, 2.1); // -41 // since 04.11.2014 (incl.) - assertEquals(84, medianSpeedup, 2.1); // +22 + // assertEquals(84, medianSpeedup, 2.1); // +22 + // since 05.12.2014 (incl.) + assertEquals(75, medianSpeedup, 2.1); // -9 } } diff --git a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java index 31f374d8dca46b0a9726ff18923438089faf214d..658ce7a02980b7127307373fc08df924317b01fe 100644 --- a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -17,9 +17,9 @@ package teetime.examples.experiment14; import java.util.List; +import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; -import teetime.framework.IStage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 { private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE; public void init() { - IStage pipeline = this.buildPipeline(); + Stage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index 738579aa22a0dffe2477c60b949f514a5890c805..f6c65eed58ff0f2c0910271db603c0b0be886636 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -17,9 +17,9 @@ package teetime.examples.experiment15; import java.util.List; +import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; -import teetime.framework.IStage; import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SpScPipe; @@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); this.clockRunnable = new RunnableStage(clockPipeline); - IStage pipeline = this.buildPipeline(this.clock); + Stage pipeline = this.buildPipeline(this.clock); this.runnable = new RunnableStage(pipeline); } diff --git a/src/performancetest/java/teetime/examples/experiment16/ChwHomePerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment16/ChwHomePerformanceCheck.java index 8333ae8ca61b6379a3e71cb462e0acd707ef895f..11a831ba5613fa4df690dd01b812f1b2e7517f72 100644 --- a/src/performancetest/java/teetime/examples/experiment16/ChwHomePerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment16/ChwHomePerformanceCheck.java @@ -1,9 +1,9 @@ package teetime.examples.experiment16; import static org.junit.Assert.assertEquals; +import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceResult; import util.test.PerformanceTest; -import util.test.AbstractProfiledPerformanceAssertion; class ChwHomePerformanceCheck extends AbstractProfiledPerformanceAssertion { @@ -15,22 +15,25 @@ class ChwHomePerformanceCheck extends AbstractProfiledPerformanceAssertion { @Override public void check() { PerformanceResult test16a = PerformanceTest.measurementRepository.performanceResults - .get("testWithManyObjectsAnd1Thread(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); + .get("testWithManyObjectsAnd1Thread(" + MethodCallThoughputTimestampAnalysis16Test.class.getName() + ")"); PerformanceResult test16b = PerformanceTest.measurementRepository.performanceResults - .get("testWithManyObjectsAnd2Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); + .get("testWithManyObjectsAnd2Threads(" + MethodCallThoughputTimestampAnalysis16Test.class.getName() + ")"); PerformanceResult test16c = PerformanceTest.measurementRepository.performanceResults - .get("testWithManyObjectsAnd4Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); + .get("testWithManyObjectsAnd4Threads(" + MethodCallThoughputTimestampAnalysis16Test.class.getName() + ")"); // check speedup double speedupB = (double) test16a.overallDurationInNs / test16b.overallDurationInNs; double speedupC = (double) test16a.overallDurationInNs / test16c.overallDurationInNs; - System.out.println("speedupB: " + speedupB); - System.out.println("speedupC: " + speedupC); + System.out.println(ChwHomePerformanceCheck.class.getName() + ", speedupB: " + speedupB); + System.out.println(ChwHomePerformanceCheck.class.getName() + ", speedupC: " + speedupC); - assertEquals(2, speedupB, 0.3); + // assertEquals(2, speedupB, 0.3); // since 31.08.2014 (incl.) // assertEquals(3.6, speedupC, 0.3); // since 04.11.2014 (incl.) - assertEquals(5, speedupC, 0.3); + // assertEquals(5, speedupC, 0.4); + // since 07.12.2014 (incl.) + assertEquals(2, speedupB, 0.4); + assertEquals(5, speedupC, 0.4); } } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java index 2963e807901026df3d3af1330f6e8c01adca5c89..f6b61ff996bb0c27eeac2da4683ba3813b3cd586 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java +++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java @@ -24,19 +24,31 @@ import org.junit.runners.MethodSorters; import teetime.util.ConstructorClosure; import teetime.util.ListUtil; import teetime.util.TimestampObject; +import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceCheckProfileRepository; import util.test.PerformanceTest; -import util.test.AbstractProfiledPerformanceAssertion; /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) +// @RunWith(Parameterized.class) public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest { - // TODO use @Parameter for the number of threads + // private final int numThreads; + // + // @Parameters + // public static Iterable<Object[]> data() { + // return Arrays.asList(new Object[][] { + // { 1 }, { 2 }, { 4 } + // }); + // } + // + // public MethodCallThoughputTimestampAnalysis16Test(final int numThreads) { + // this.numThreads = numThreads; + // } @BeforeClass public static void beforeClass() { @@ -52,17 +64,17 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest @Test public void testWithManyObjectsAnd1Thread() { - this.performAnalysis(1); + performAnalysis(1); } @Test public void testWithManyObjectsAnd2Threads() { - this.performAnalysis(2); + performAnalysis(2); } @Test public void testWithManyObjectsAnd4Threads() { - this.performAnalysis(4); + performAnalysis(4); } private void performAnalysis(final int numThreads) { diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java index 503e8b832082255ab6d1abf5813bde10c8e11ba2..60baf45d3935d88312bbc7ba788e351e9ba5981e 100644 --- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; -import teetime.framework.IStage; +import teetime.framework.Stage; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.PipeFactoryRegistry; @@ -34,8 +34,8 @@ import teetime.stage.ObjectProducer; import teetime.stage.Relay; import teetime.stage.StartTimestampFilter; import teetime.stage.StopTimestampFilter; -import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.io.EveryXthPrinter; import teetime.util.ConstructorClosure; import teetime.util.TimestampObject; @@ -133,7 +133,7 @@ public class MethodCallThroughputAnalysis17 { * @param numNoopFilters * @since 1.10 */ - private OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final IStage previousStage, + private OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final Stage previousStage, final List<TimestampObject> timestampObjects) { // create stages Relay<TimestampObject> relay = new Relay<TimestampObject>(); @@ -144,6 +144,7 @@ public class MethodCallThroughputAnalysis17 { noopFilters[i] = new NoopFilter<TimestampObject>(); } final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); @@ -156,7 +157,8 @@ public class MethodCallThroughputAnalysis17 { UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + UnorderedGrowablePipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); diff --git a/src/performancetest/java/teetime/examples/loopStage/Countdown.java b/src/performancetest/java/teetime/examples/loopStage/Countdown.java index bf045270ad614e6973274164cabc78ef11aa68c9..915837b5d69e59d137ae2186012ffb9f4fcc1e72 100644 --- a/src/performancetest/java/teetime/examples/loopStage/Countdown.java +++ b/src/performancetest/java/teetime/examples/loopStage/Countdown.java @@ -1,8 +1,8 @@ package teetime.examples.loopStage; +import teetime.framework.AbstractProducerStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.AbstractProducerStage; public class Countdown extends AbstractProducerStage<Void> { @@ -26,10 +26,10 @@ public class Countdown extends AbstractProducerStage<Void> { protected void execute() { Integer countdown = this.countdownInputPort.receive(); if (countdown == 0) { - this.send(this.outputPort, null); + outputPort.send(null); this.terminate(); } else { - this.send(this.newCountdownOutputPort, --countdown); + newCountdownOutputPort.send(--countdown); } } diff --git a/src/performancetest/java/teetime/framework/OldHeadPipeline.java b/src/performancetest/java/teetime/framework/OldHeadPipeline.java index 8f3418483e5c6e66ea7db42cc10cb55e59790c1d..eb61c5d100a01a68939eaddd377b2d6f39f9c679 100644 --- a/src/performancetest/java/teetime/framework/OldHeadPipeline.java +++ b/src/performancetest/java/teetime/framework/OldHeadPipeline.java @@ -1,7 +1,7 @@ package teetime.framework; @Deprecated -public class OldHeadPipeline<FirstStage extends IStage, LastStage extends IStage> extends OldPipeline<FirstStage, LastStage> implements IStage { +public final class OldHeadPipeline<FirstStage extends Stage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> { public OldHeadPipeline() {} diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 98f846d6b7d91465ee3909bf99a7ac0364f72781..56c8fffd59bc22e7b2a9efec73b9c83095ed502b 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -6,7 +6,7 @@ import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @Deprecated -public class OldPipeline<FirstStage extends IStage, LastStage extends IStage> implements IStage { +public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> extends Stage { protected FirstStage firstStage; protected LastStage lastStage; @@ -27,26 +27,11 @@ public class OldPipeline<FirstStage extends IStage, LastStage extends IStage> im this.lastStage = lastStage; } - @Override - public String getId() { - return this.firstStage.getId(); - } - @Override public void executeWithPorts() { this.firstStage.executeWithPorts(); } - @Override - public IStage getParentStage() { - return this.firstStage.getParentStage(); - } - - @Override - public void setParentStage(final IStage parentStage, final int index) { - this.firstStage.setParentStage(parentStage, index); - } - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.firstStage.onSignal(signal, inputPort); @@ -59,20 +44,17 @@ public class OldPipeline<FirstStage extends IStage, LastStage extends IStage> im @Override public TerminationStrategy getTerminationStrategy() { - // TODO Auto-generated method stub - return null; + return firstStage.getTerminationStrategy(); } @Override public void terminate() { - // TODO Auto-generated method stub - + firstStage.terminate(); } @Override public boolean shouldBeTerminated() { - // TODO Auto-generated method stub - return false; + return firstStage.shouldBeTerminated(); } } diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index d8c344e366736cb8fb67d9e30b6ac0705e438fce..a7af14594eaf8ae8ee0628b17b88e924dbe4a7ef 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -10,10 +10,10 @@ import teetime.stage.CipherByteArray; import teetime.stage.CipherByteArray.CipherMode; import teetime.stage.Counter; import teetime.stage.InitialElementProducer; -import teetime.stage.Tokenizer; import teetime.stage.ZipByteArray; import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.File2ByteArray; +import teetime.stage.string.Tokenizer; public class TokenizerConfiguration extends AnalysisConfiguration { diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index b5f7a1281afa1c0a950dca112e742df61cfd22bb..20d366d6aea0079c67138c645e373f0f5a4f71e2 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -6,6 +6,7 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; diff --git a/src/test/java/teetime/stage/basic/merger/MergerTest.java b/src/test/java/teetime/stage/basic/merger/MergerTest.java index a5d235ad159f3710c847139ccf3b936e1b685c77..7afbb35d67f3af495dde66bd87f0a91ee219fa0f 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTest.java @@ -63,8 +63,24 @@ public class MergerTest { Assert.assertFalse(testPipe.startSent()); Assert.assertTrue(testPipe.terminateSent()); - merger.onSignal(new StartingSignal(), firstPort); + merger.onSignal(new StartingSignal(), secondPort); Assert.assertTrue(testPipe.startSent()); Assert.assertTrue(testPipe.terminateSent()); } + + @Test + public void testMultipleSignals() { + this.beforeSignalTesting(); + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + + merger.onSignal(new StartingSignal(), firstPort); + Assert.assertFalse(testPipe.startSent()); + + merger.onSignal(new StartingSignal(), secondPort); + Assert.assertTrue(testPipe.startSent()); + } } diff --git a/src/test/java/teetime/framework/FileSearcherTest.java b/src/test/java/teetime/util/classpath/FileSearcherTest.java similarity index 90% rename from src/test/java/teetime/framework/FileSearcherTest.java rename to src/test/java/teetime/util/classpath/FileSearcherTest.java index 0b7085b4ee7ee557ebf139769b4a2b6e60216ee2..44ca4ad7dac3d2bf7bcde24825b2cd3d91838f8e 100644 --- a/src/test/java/teetime/framework/FileSearcherTest.java +++ b/src/test/java/teetime/util/classpath/FileSearcherTest.java @@ -1,4 +1,4 @@ -package teetime.framework; +package teetime.util.classpath; import java.io.IOException; import java.net.URL; @@ -7,6 +7,8 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import teetime.util.classpath.FileSearcher; + public class FileSearcherTest { public FileSearcherTest() {} diff --git a/src/test/resources/logback.groovy b/src/test/resources/logback.groovy index 3b223177f95e3d1082fef40cc7cd53118215c041..a888fc48590a7920e3c82e1ad9ab1caf1c38bc8c 100644 --- a/src/test/resources/logback.groovy +++ b/src/test/resources/logback.groovy @@ -21,7 +21,7 @@ appender("CONSOLE", ConsoleAppender) { root ERROR, ["CONSOLE"] -logger "teetime.framework", INFO +//logger "teetime.framework", INFO logger "teetime.stage", INFO logger "util", INFO \ No newline at end of file