diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index e30d6d548a303760d31411b58f3f7cd8d05aed81..8aa5ab1dd1b71cc0d4168de5fe5a47e73dc45607 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Thu Jun 18 09:21:56 CEST 2015 +#Mon Jun 22 16:34:51 CEST 2015 detector_threshold=2 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/pom.xml b/pom.xml index e4c191866892524a6fc518a6b2c2a3ca75b65e15..d6b8981a29c3922c1190b252166607f6137b8484 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ <groupId>net.sourceforge.teetime</groupId> <artifactId>teetime</artifactId> - <version>1.2-SNAPSHOT</version> + <version>2.0-SNAPSHOT</version> <packaging>jar</packaging> <name>TeeTime</name> diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 61b694ea33eae506552220642f122d535ee1b6c2..23c5758a7a62d1b93f768473b3066d374ba5c6a1 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -11,10 +11,21 @@ TeeTime automatically chooses the correct type of pipe for all connections. </action> + <action dev="ntd" type="add"> + Stages without any input port are + automatically executed in a dedicated thread. + </action> <action dev="ntd" type="fix" issue="93"> Introduced a new concept for composing stages. </action> + <action dev="ntd" type="add" issue="171"> + Configurations are now + built within an AnalysisContext which is passed on to nested + CompositeStages. + This removes any constraints on CompositeStages and + enables therefore multiple connections and multithreading. + </action> <action dev="ntd" type="remove"> Marked Pair class as deprecated. </action> diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 6623809dd09befc5ec8081bfd2a44342b9ff1488..59b8a5ef242a301beb788f1ba142c7fde144d365 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -23,8 +23,34 @@ package teetime.framework; * * */ -public abstract class AbstractCompositeStage extends AnalysisConfiguration { +public abstract class AbstractCompositeStage extends Configuration { - protected abstract Stage getFirstStage(); + private final ConfigurationContext context; + + public AbstractCompositeStage(final ConfigurationContext context) { + if (null == context) { + throw new IllegalArgumentException("Context may not be null."); + } + this.context = context; + } + + @Override + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + context.connectPorts(sourcePort, targetPort, capacity); + } + + @Override + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + connectPorts(sourcePort, targetPort, 4); + } + + @Override + protected void addThreadableStage(final Stage stage) { + context.addThreadableStage(stage); + } + + protected ConfigurationContext getContext() { + return context; + } } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java new file mode 100644 index 0000000000000000000000000000000000000000..9615e6b08857db181bb973fd4b22855e1d413a27 --- /dev/null +++ b/src/main/java/teetime/framework/Configuration.java @@ -0,0 +1,10 @@ +package teetime.framework; + +public abstract class Configuration { + + protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity); + + protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort); + + protected abstract void addThreadableStage(final Stage stage); +} diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/ConfigurationContext.java similarity index 89% rename from src/main/java/teetime/framework/AnalysisConfiguration.java rename to src/main/java/teetime/framework/ConfigurationContext.java index 0d360b5b163b06a2d1c4b871ddd2d0a87749bb62..4e2f5f6acff99a2f951db19ec8f93e6a000fab7e 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -18,6 +18,9 @@ package teetime.framework; import java.util.HashSet; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; @@ -29,7 +32,9 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; * Represents a configuration of connected stages, which is needed to run a analysis. * Stages can be added by executing {@link #addThreadableStage(Stage)}. */ -public abstract class AnalysisConfiguration { +public abstract class ConfigurationContext extends Configuration { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private final Set<Stage> threadableStages = new HashSet<Stage>(); @@ -59,20 +64,10 @@ public abstract class AnalysisConfiguration { * @param stage * A arbitrary stage, which will be added to the configuration and executed in a thread. */ + @Override protected final void addThreadableStage(final Stage stage) { - this.threadableStages.add(stage); - } - - /** - * Execute this method, to add a CompositeStage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary CompositeStage, which will be added to the configuration and executed in a thread. - */ - protected final void addThreadableStage(final AbstractCompositeStage stage) { - this.threadableStages.add(stage.getFirstStage()); - for (Stage threadableStage : stage.getThreadableStages()) { - this.addThreadableStage(threadableStage); + if (!this.threadableStages.add(stage)) { + LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); } } @@ -185,6 +180,7 @@ public abstract class AnalysisConfiguration { * @param <T> * the type of elements to be sent */ + @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { connectPorts(sourcePort, targetPort, 4); } @@ -201,7 +197,15 @@ public abstract class AnalysisConfiguration { * @param <T> * the type of elements to be sent */ + @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { + addThreadableStage(sourcePort.getOwningStage()); + } + if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { + LOGGER.warn("Overwriting existing pipe while connecting stages " + + sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); + } new InstantiationPipe(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Execution.java similarity index 86% rename from src/main/java/teetime/framework/Analysis.java rename to src/main/java/teetime/framework/Execution.java index a8fc0514bf9697495140cb317db28cace9af5ec8..042584b4d270d1407c907a77d0b9aa55156472f5 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Execution.java @@ -34,8 +34,8 @@ import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; /** - * Represents an Analysis to which stages can be added and executed later. - * This needs a {@link AnalysisConfiguration}, + * Represents an Execution to which stages can be added and executed later. + * This needs a {@link ConfigurationContext}, * in which the adding and configuring of stages takes place. * To start the analysis {@link #executeBlocking()} needs to be executed. * This class will automatically create threads and join them without any further commitment. @@ -43,11 +43,11 @@ import teetime.util.Pair; * @author Christian Wulf, Nelson Tavares de Sousa * * @param <T> - * the type of the {@link AnalysisConfiguration} + * the type of the {@link ConfigurationContext} */ -public final class Analysis<T extends AnalysisConfiguration> implements UncaughtExceptionHandler { +public final class Execution<T extends ConfigurationContext> implements UncaughtExceptionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); private final T configuration; @@ -64,32 +64,32 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); /** - * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. + * Creates a new {@link Execution} that skips validating the port connections and uses the default listener. * * @param configuration * to be used for the analysis */ - public Analysis(final T configuration) { + public Execution(final T configuration) { this(configuration, false, new IgnoringExceptionListenerFactory()); } - public Analysis(final T configuration, final boolean validationEnabled) { + public Execution(final T configuration, final boolean validationEnabled) { this(configuration, validationEnabled, new IgnoringExceptionListenerFactory()); } /** - * Creates a new {@link Analysis} that skips validating the port connections and uses a specific listener. + * Creates a new {@link Execution} that skips validating the port connections and uses a specific listener. * * @param configuration * to be used for the analysis * @param factory * specific listener for the exception handling */ - public Analysis(final T configuration, final IExceptionListenerFactory factory) { + public Execution(final T configuration, final IExceptionListenerFactory factory) { this(configuration, false, factory); } - public Analysis(final T configuration, final boolean validationEnabled, final IExceptionListenerFactory factory) { + public Execution(final T configuration, final boolean validationEnabled, final IExceptionListenerFactory factory) { this.configuration = configuration; this.factory = factory; if (validationEnabled) { @@ -118,8 +118,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * */ private final void init() { - AnalysisInstantiation analysisInstantiation = new AnalysisInstantiation(configuration); - analysisInstantiation.instantiatePipes(); + ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration); + executionInstantiation.instantiatePipes(); final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); if (threadableStageJobs.isEmpty()) { @@ -194,7 +194,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught /** * Calling this method will block the current thread, until the analysis terminates. * - * @throws AnalysisException + * @throws ExecutionException * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable * * @since 1.1 @@ -209,7 +209,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught thread.join(); } } catch (InterruptedException e) { - LOGGER.error("Analysis has stopped unexpectedly", e); + LOGGER.error("Execution has stopped unexpectedly", e); for (Thread thread : this.finiteProducerThreads) { thread.interrupt(); } @@ -224,7 +224,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } if (!exceptions.isEmpty()) { - throw new AnalysisException(exceptions); + throw new ExecutionException(exceptions); } } @@ -244,9 +244,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } /** - * This method will start the Analysis and block until it is finished. + * This method will start the Execution and block until it is finished. * - * @throws AnalysisException + * @throws ExecutionException * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable. * * @since 1.1 @@ -285,9 +285,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } /** - * Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis + * Retrieves the Configuration which was used to add and arrange all stages needed for the Execution * - * @return the configuration used for the Analysis + * @return the configuration used for the Execution */ public T getConfiguration() { return this.configuration; diff --git a/src/main/java/teetime/framework/AnalysisException.java b/src/main/java/teetime/framework/ExecutionException.java similarity index 91% rename from src/main/java/teetime/framework/AnalysisException.java rename to src/main/java/teetime/framework/ExecutionException.java index 90fcdcf73d6bc1f2c48f95efa2480b441fbf04d0..0f123784ed18b778d269b83eada2e19c7bf4ada8 100644 --- a/src/main/java/teetime/framework/AnalysisException.java +++ b/src/main/java/teetime/framework/ExecutionException.java @@ -25,7 +25,7 @@ import teetime.util.Pair; * * @since 1.1 */ -public class AnalysisException extends RuntimeException { +public class ExecutionException extends RuntimeException { /** * @@ -34,7 +34,7 @@ public class AnalysisException extends RuntimeException { private final Collection<Pair<Thread, Throwable>> exceptions; - public AnalysisException(final Collection<Pair<Thread, Throwable>> exceptions) { + public ExecutionException(final Collection<Pair<Thread, Throwable>> exceptions) { super("Error(s) while running analysis. Check thrown exceptions."); this.exceptions = exceptions; } diff --git a/src/main/java/teetime/framework/AnalysisInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java similarity index 71% rename from src/main/java/teetime/framework/AnalysisInstantiation.java rename to src/main/java/teetime/framework/ExecutionInstantiation.java index d1283e22f181038399fd2bfb9f770bb1da8594be..8a8de620134b47b154abcec31ee1f6b41861e43a 100644 --- a/src/main/java/teetime/framework/AnalysisInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package teetime.framework; import java.util.HashMap; @@ -13,22 +28,22 @@ import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.UnboundedSpScPipeFactory; -class AnalysisInstantiation { +class ExecutionInstantiation { - private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisInstantiation.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class); private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private final AnalysisConfiguration configuration; + private final ConfigurationContext configuration; - public AnalysisInstantiation(final AnalysisConfiguration configuration) { + public ExecutionInstantiation(final ConfigurationContext configuration) { this.configuration = configuration; } @SuppressWarnings("rawtypes") - Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final AnalysisConfiguration configuration) { + Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { Integer createdConnections = new Integer(0); Set<Stage> threadableStageJobs = configuration.getThreadableStages(); for (OutputPort outputPort : threadableStage.getOutputPorts()) { @@ -74,7 +89,7 @@ class AnalysisInstantiation { colors.put(threadableStage, i); createdConnections = colorAndConnectStages(i, colors, threadableStage, configuration); } - LOGGER.debug("Created " + createdConnections + "connections"); + LOGGER.debug("Created " + createdConnections + " connections"); } } diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index 7e5d4e0f5de9a9ceeab3a91219c384447adc8cbc..b922f15ec7978fa255fa9aae4d8c2a4d99ebe6dd 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -21,95 +21,85 @@ import teetime.framework.signal.ISignal; public class InstantiationPipe implements IPipe { - private final InputPort<?> target; + private static final String ERROR_MESSAGE = "This must not be called while executing the configuration"; + + private final InputPort<?> targetPort; private final int capacity; public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - this.target = targetPort; + this.targetPort = targetPort; this.capacity = capacity; sourcePort.setPipe(this); + targetPort.setPipe(this); } public int getCapacity() { return capacity; } + @Override + public InputPort<?> getTargetPort() { + return this.targetPort; + } + @Override public boolean add(final Object element) { - // TODO Auto-generated method stub - return false; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean addNonBlocking(final Object element) { - // TODO Auto-generated method stub - return false; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean isEmpty() { - // TODO Auto-generated method stub - return false; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public int size() { - // TODO Auto-generated method stub - return 0; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public Object removeLast() { - // TODO Auto-generated method stub - return null; - } - - @Override - public InputPort<?> getTargetPort() { - // TODO Auto-generated method stub - return this.target; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void sendSignal(final ISignal signal) { - // TODO Auto-generated method stub - + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void reportNewElement() { - // TODO Auto-generated method stub - + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean isClosed() { - // TODO Auto-generated method stub - return false; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public boolean hasMore() { - // TODO Auto-generated method stub - return false; + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void waitForStartSignal() throws InterruptedException { - // TODO Auto-generated method stub - + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void waitForInitializingSignal() throws InterruptedException { - // TODO Auto-generated method stub - + throw new IllegalStateException(ERROR_MESSAGE); } @Override public void close() { - // TODO Auto-generated method stub - + throw new IllegalStateException(ERROR_MESSAGE); } } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 59092010e96fdb08cf4e783d8aae1143eb317313..b9c55537c3b45ea6f42f5064b7aa1fd2120f5bf5 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -19,9 +19,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import teetime.framework.Analysis; -import teetime.framework.AnalysisConfiguration; -import teetime.framework.AnalysisException; +import teetime.framework.ConfigurationContext; +import teetime.framework.Execution; +import teetime.framework.ExecutionException; import teetime.framework.Stage; import teetime.framework.StageState; import teetime.stage.CollectorSink; @@ -72,24 +72,23 @@ public final class StageTester { /** * This method will start the test and block until it is finished. * - * @throws AnalysisException + * @throws ExecutionException * if at least one exception in one thread has occurred within the analysis. * The exception contains the pairs of thread and throwable. * */ public void start() { - final AnalysisConfiguration configuration = new Configuration(inputHolders, stage, outputHolders); - final Analysis<AnalysisConfiguration> analysis = new Analysis<AnalysisConfiguration>(configuration); + final ConfigurationContext configuration = new Configuration(inputHolders, stage, outputHolders); + final Execution<ConfigurationContext> analysis = new Execution<ConfigurationContext>(configuration); analysis.executeBlocking(); } - private final class Configuration extends AnalysisConfiguration { + private final class Configuration extends ConfigurationContext { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); connectPorts(producer.getOutputPort(), inputHolder.getPort()); - addThreadableStage(producer); } addThreadableStage(stage); diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index c074dfd5c59835131b132d859d7a5a145bf0c23c..24eb08eca3008c315395eeea6bf9fa1735e86d64 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.AbstractCompositeStage; +import teetime.framework.ConfigurationContext; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; @@ -31,7 +32,8 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { private final Distributor<T> distributor; private final List<Stage> lastStages = new ArrayList<Stage>(); - public EveryXthPrinter(final int threshold) { + public EveryXthPrinter(final int threshold, final ConfigurationContext context) { + super(context); distributor = new Distributor<T>(new CopyByReferenceStrategy()); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); Printer<Integer> printer = new Printer<Integer>(); @@ -50,8 +52,7 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { return distributor.getNewOutputPort(); } - @Override - protected Stage getFirstStage() { + public Stage getFirstStage() { return distributor; } diff --git a/src/main/java/teetime/stage/string/WordCounter.java b/src/main/java/teetime/stage/string/WordCounter.java index 9c632b22726191726dd64af10c92018f8f9958ae..08e10ed9de237288fd615d62676d7e7b01d01e60 100644 --- a/src/main/java/teetime/stage/string/WordCounter.java +++ b/src/main/java/teetime/stage/string/WordCounter.java @@ -18,6 +18,7 @@ package teetime.stage.string; import java.util.ArrayList; import teetime.framework.AbstractCompositeStage; +import teetime.framework.ConfigurationContext; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.Stage; @@ -41,7 +42,8 @@ public final class WordCounter extends AbstractCompositeStage { private final ArrayList<Stage> lastStages = new ArrayList<Stage>(); // The connection of the different stages is realized within the construction of a instance of this class. - public WordCounter() { + public WordCounter(final ConfigurationContext context) { + super(context); this.lastStages.add(this.mapCounter); final ToLowerCase toLowerCase = new ToLowerCase(); @@ -50,8 +52,7 @@ public final class WordCounter extends AbstractCompositeStage { // connectStages(wordcharacterFilter.getOutputPort(), this.mapCounter.getInputPort()); } - @Override - protected Stage getFirstStage() { + public Stage getFirstStage() { return this.tokenizer; } diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index 95ba33f4357662d7014573bfe67f6f5d810582f7..7f46f37b9b21359fc9995d38d17b766d259cb3ce 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.cipher; import java.io.File; -import teetime.framework.AnalysisConfiguration; +import teetime.framework.ConfigurationContext; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; import teetime.stage.InitialElementProducer; @@ -26,7 +26,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.ByteArrayFileWriter; import teetime.stage.io.File2ByteArray; -public class CipherConfiguration extends AnalysisConfiguration { +public class CipherConfiguration extends ConfigurationContext { public CipherConfiguration(final String inputFile, final String outputFile, final String password) { final File input = new File(inputFile); @@ -47,8 +47,5 @@ public class CipherConfiguration extends AnalysisConfiguration { connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); connectPorts(decrypt.getOutputPort(), writer.getInputPort()); - // this.getFiniteProducerStages().add(init); - this.addThreadableStage(init); - } } diff --git a/src/test/java/teetime/examples/cipher/CipherTest.java b/src/test/java/teetime/examples/cipher/CipherTest.java index ac03de784a47b3a892d4ecdf9724898965a0ff1f..ca53aa4a0585c47be782d9a68c2ffeb6bdd5a4a2 100644 --- a/src/test/java/teetime/examples/cipher/CipherTest.java +++ b/src/test/java/teetime/examples/cipher/CipherTest.java @@ -21,8 +21,8 @@ import java.io.IOException; import org.junit.Assert; import org.junit.Test; -import teetime.framework.Analysis; -import teetime.framework.AnalysisConfiguration; +import teetime.framework.Execution; +import teetime.framework.ConfigurationContext; import com.google.common.io.Files; @@ -43,9 +43,9 @@ public class CipherTest { final String outputFile = "src/test/resources/data/output.txt"; final String password = "Password"; - final AnalysisConfiguration configuration = new CipherConfiguration(inputFile, outputFile, password); - final Analysis analysis = new Analysis(configuration); - analysis.executeBlocking(); + final ConfigurationContext configuration = new CipherConfiguration(inputFile, outputFile, password); + final Execution execution = new Execution(configuration); + execution.executeBlocking(); Assert.assertTrue(Files.equal(new File(inputFile), new File(outputFile))); } diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index 15c325dad0a5cca3f7df333d092a8444ec16af55..abafb20ba5a1f71cdb522769d18077a4fb81e695 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.tokenizer; import java.io.File; -import teetime.framework.AnalysisConfiguration; +import teetime.framework.ConfigurationContext; import teetime.stage.ByteArray2String; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; @@ -28,7 +28,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.File2ByteArray; import teetime.stage.string.Tokenizer; -public class TokenizerConfiguration extends AnalysisConfiguration { +public class TokenizerConfiguration extends ConfigurationContext { private final Counter<String> counter; @@ -50,7 +50,6 @@ public class TokenizerConfiguration extends AnalysisConfiguration { connectPorts(b2s.getOutputPort(), tokenizer.getInputPort()); connectPorts(tokenizer.getOutputPort(), this.counter.getInputPort()); - this.addThreadableStage(init); } public int getTokenCount() { diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java index 070c722444cec08e8870f05a6eb0e8512ba4e024..c78d69698e4e32505d7118c5212e0484332431e1 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerTest.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerTest.java @@ -22,7 +22,7 @@ import java.nio.charset.Charset; import org.junit.Assert; import org.junit.Test; -import teetime.framework.Analysis; +import teetime.framework.Execution; import com.google.common.io.Files; @@ -42,8 +42,8 @@ public class TokenizerTest { final String password = "Password"; final TokenizerConfiguration configuration = new TokenizerConfiguration(inputFile, password); - final Analysis analysis = new Analysis(configuration); - analysis.executeBlocking(); + final Execution execution = new Execution(configuration); + execution.executeBlocking(); final String string = Files.toString(new File("src/test/resources/data/input.txt"), Charset.forName("UTF-8")); diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/ExecutionTest.java similarity index 72% rename from src/test/java/teetime/framework/AnalysisTest.java rename to src/test/java/teetime/framework/ExecutionTest.java index 5c347dfe763eaa8627db1930e973dcf26eac1594..758dcfb1221b12ca368821057712af080c844d77 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -33,51 +33,50 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.basic.Sink; import teetime.util.StopWatch; -public class AnalysisTest { +public class ExecutionTest { private static final long DELAY_IN_MS = 500; private static final long ABSOLUTE_MAX_ERROR_IN_MS = 2; - private Analysis<TestConfig> analysis; + private Execution<TestConfig> execution; @Before public void before() { TestConfig configuration = new TestConfig(); - analysis = new Analysis<TestConfig>(configuration); + execution = new Execution<TestConfig>(configuration); } @Test public void testExecuteNonBlocking() throws Exception { StopWatch watch = new StopWatch(); watch.start(); - analysis.executeNonBlocking(); + execution.executeNonBlocking(); watch.end(); assertThat(watch.getDurationInMs(), is(lessThan(DELAY_IN_MS))); - assertFalse(analysis.getConfiguration().delay.finished); + assertFalse(execution.getConfiguration().delay.finished); - analysis.waitForTermination(); - assertTrue(analysis.getConfiguration().delay.finished); + execution.waitForTermination(); + assertTrue(execution.getConfiguration().delay.finished); } @Test public void testExecuteBlocking() { StopWatch watch = new StopWatch(); watch.start(); - analysis.executeBlocking(); + execution.executeBlocking(); watch.end(); assertThat(watch.getDurationInMs() + ABSOLUTE_MAX_ERROR_IN_MS, is(greaterThanOrEqualTo(DELAY_IN_MS))); } - private static class TestConfig extends AnalysisConfiguration { + private static class TestConfig extends ConfigurationContext { public final DelayAndTerminate delay; public TestConfig() { final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(DELAY_IN_MS); connectPorts(init.getOutputPort(), delay.getInputPort()); - addThreadableStage(init); } } @@ -105,20 +104,19 @@ public class AnalysisTest { @Test public void testInstantiatePipes() throws Exception { - Analysis<AnalysisTestConfig> interAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(true)); + Execution<AnalysisTestConfig> interAnalysis = new Execution<AnalysisTestConfig>(new AnalysisTestConfig(true)); assertThat(interAnalysis.getConfiguration().init.getOwningThread(), is(not(interAnalysis.getConfiguration().sink.getOwningThread()))); - Analysis<AnalysisTestConfig> intraAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(false)); + Execution<AnalysisTestConfig> intraAnalysis = new Execution<AnalysisTestConfig>(new AnalysisTestConfig(false)); assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread())); } - private class AnalysisTestConfig extends AnalysisConfiguration { + private class AnalysisTestConfig extends ConfigurationContext { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public Sink<Object> sink = new Sink<Object>(); public AnalysisTestConfig(final boolean inter) { connectPorts(init.getOutputPort(), sink.getInputPort()); - addThreadableStage(init); if (inter) { addThreadableStage(sink); } @@ -133,10 +131,10 @@ public class AnalysisTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("Crossing threads"); InvalidTestConfig configuration = new InvalidTestConfig(); - new Analysis<InvalidTestConfig>(configuration); + new Execution<InvalidTestConfig>(configuration); } - private class InvalidTestConfig extends AnalysisConfiguration { + private class InvalidTestConfig extends ConfigurationContext { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); public Sink<Object> sink = new Sink<Object>(); @@ -145,9 +143,37 @@ public class AnalysisTest { connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort()); - addThreadableStage(init); addThreadableStage(iof); } } + @Test + public void automaticallyAddHeadStages() { + AutomaticallyConfig context = new AutomaticallyConfig(); + new Execution<ConfigurationContext>(context).executeBlocking(); + assertTrue(context.executed); + } + + private class AutomaticallyConfig extends ConfigurationContext { + + public boolean executed; + + public AutomaticallyConfig() { + AutomaticallyAddedStage aas = new AutomaticallyAddedStage(); + Sink<Object> sink = new Sink<Object>(); + connectPorts(aas.getOutputPort(), sink.getInputPort()); + } + + private class AutomaticallyAddedStage extends AbstractProducerStage<Object> { + + @Override + protected void execute() { + executed = true; + terminate(); + } + + } + + } + } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index 5be8ef32e1ace9e38c468d41984df0b5bc016ecc..b74a255ecdbaa3c2633fd2e83d39e4f556815091 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -34,11 +34,11 @@ public class RunnableConsumerStageTest { public void testWaitingInfinitely() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(); - final Analysis analysis = new Analysis(configuration); + final Execution execution = new Execution(configuration); final Thread thread = new Thread(new Runnable() { @Override public void run() { - start(analysis); + start(execution); } }); thread.start(); @@ -59,8 +59,8 @@ public class RunnableConsumerStageTest { public void testCorrectStartAndTerminatation() throws Exception { RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration(0, 1, 2, 3, 5); - final Analysis analysis = new Analysis(configuration); - start(analysis); + final Execution execution = new Execution(configuration); + start(execution); assertEquals(5, configuration.getCollectedElements().size()); } @@ -69,7 +69,7 @@ public class RunnableConsumerStageTest { // public void testWaitingInfinitely() throws Exception { // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // - // final Analysis analysis = new Analysis(waitStrategyConfiguration); + // final Execution analysis = new Execution(waitStrategyConfiguration); // Thread thread = new Thread(new Runnable() { // @Override // public void run() { @@ -88,7 +88,7 @@ public class RunnableConsumerStageTest { // public void testWaitingFinitely() throws Exception { // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // - // final Analysis analysis = new Analysis(waitStrategyConfiguration); + // final Execution analysis = new Execution(waitStrategyConfiguration); // Thread thread = new Thread(new Runnable() { // @Override // public void run() { @@ -109,19 +109,19 @@ public class RunnableConsumerStageTest { public void testYieldRun() throws Exception { YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); - final Analysis analysis = new Analysis(waitStrategyConfiguration); + final Execution execution = new Execution(waitStrategyConfiguration); - start(analysis); + start(execution); assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); } - private void start(final Analysis analysis) { + private void start(final Execution execution) { Collection<Pair<Thread, Throwable>> exceptions = new ArrayList<Pair<Thread, Throwable>>(); try { - analysis.executeBlocking(); - } catch (AnalysisException e) { + execution.executeBlocking(); + } catch (ExecutionException e) { exceptions = e.getThrownExceptions(); } for (Pair<Thread, Throwable> pair : exceptions) { diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 74d0cd73531a604c7170771fb699788ae80cbde0..904cae287a8404ed1d314edbc1d6f89e8332bd5b 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; -public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguration { +public class RunnableConsumerStageTestConfiguration extends ConfigurationContext { private final List<Integer> collectedElements = new ArrayList<Integer>(); private final CollectorSink<Integer> collectorSink; diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 19580dcb72e9573163c2c40755d4f18a45b60e4c..c8cf92c2d4a5543de9b6c3c31d00f81f991c6b5b 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -47,13 +47,13 @@ public class StageTest { @Test public void testSetOwningThread() throws Exception { TestConfig tc = new TestConfig(); - new Analysis<TestConfig>(tc); + new Execution<TestConfig>(tc); assertEquals(tc.init.owningThread, tc.delay.owningThread); assertThat(tc.delay.exceptionHandler, is(notNullValue())); assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler); } - private static class TestConfig extends AnalysisConfiguration { + private static class TestConfig extends ConfigurationContext { public final DelayAndTerminate delay; public InitialElementProducer<String> init; @@ -61,7 +61,6 @@ public class StageTest { init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(0); connectPorts(init.getOutputPort(), delay.getInputPort()); - addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index f895104874d57a005aa66e0fa6788c228b36747f..782fc8cfafb85c3ff2538ec54f9dcf71d739bf8e 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -42,7 +42,7 @@ public class TraversorTest { @Test public void traverse() { TestConfiguration tc = new TestConfiguration(); - new Analysis<TestConfiguration>(tc); + new Execution<TestConfiguration>(tc); traversor.traverse(tc.init); Set<Stage> comparingStages = new HashSet<Stage>(); comparingStages.add(tc.init); @@ -53,7 +53,7 @@ public class TraversorTest { } // WordCounterConfiguration - private class TestConfiguration extends AnalysisConfiguration { + private class TestConfiguration extends ConfigurationContext { public final CountingMapMerger<String> result = new CountingMapMerger<String>(); public final InitialElementProducer<File> init; @@ -77,13 +77,13 @@ public class TraversorTest { // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); - final WordCounter wc = new WordCounter(); + final WordCounter wc = new WordCounter(this); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(wc); + addThreadableStage(wc.getFirstStage()); } @@ -91,7 +91,6 @@ public class TraversorTest { connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - addThreadableStage(init); addThreadableStage(merger); } diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 191f8460e6f68a04007a05f7cc29338e3c2789f2..5ca67a386f4ed103dfe9370ed92b5142a5515ce4 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -21,7 +21,7 @@ import teetime.stage.InitialElementProducer; import teetime.stage.Relay; import teetime.stage.basic.Delay; -class WaitStrategyConfiguration extends AnalysisConfiguration { +class WaitStrategyConfiguration extends ConfigurationContext { private Delay<Object> delay; private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index a63025947a38b2db8f3e12e3bad6c1637d52fa8a..b19df100b246d87c72d1e76ae67c267ef99d2f4e 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -19,7 +19,7 @@ import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.Relay; -class YieldStrategyConfiguration extends AnalysisConfiguration { +class YieldStrategyConfiguration extends ConfigurationContext { private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java index 1dc02f2b55cfaf73c64b4a87956be187807520d3..d264619e330fc94d7800e1d81b097b71b41a04f9 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java @@ -22,29 +22,29 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; -import teetime.framework.Analysis; -import teetime.framework.AnalysisException; +import teetime.framework.Execution; +import teetime.framework.ExecutionException; import teetime.framework.StageState; public class ExceptionHandlingTest { - private Analysis<ExceptionTestConfiguration> analysis; + private Execution<ExceptionTestConfiguration> execution; public ExceptionTestConfiguration newInstances() { ExceptionTestConfiguration configuration = new ExceptionTestConfiguration(); - analysis = new Analysis<ExceptionTestConfiguration>(configuration, new TestListenerFactory()); + execution = new Execution<ExceptionTestConfiguration>(configuration, new TestListenerFactory()); return configuration; } public void exceptionPassingAndTermination() { newInstances(); - analysis.executeBlocking(); + execution.executeBlocking(); assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread too early } public void terminatesAllStages() { ExceptionTestConfiguration config = newInstances(); - analysis.executeBlocking(); + execution.executeBlocking(); assertThat(config.first.getCurrentState(), is(StageState.TERMINATED)); assertThat(config.second.getCurrentState(), is(StageState.TERMINATED)); assertThat(config.third.getCurrentState(), is(StageState.TERMINATED)); @@ -56,7 +56,7 @@ public class ExceptionHandlingTest { boolean exceptionArised = false; try { exceptionPassingAndTermination(); - } catch (AnalysisException e) { + } catch (ExecutionException e) { exceptionArised = true; } assertTrue(exceptionArised); @@ -64,7 +64,7 @@ public class ExceptionHandlingTest { exceptionArised = false; try { terminatesAllStages(); - } catch (AnalysisException e) { + } catch (ExecutionException e) { exceptionArised = true; } assertTrue(exceptionArised); diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index d985faf23a7c09a0eb2ee3e1727c289047a3f209..f7d1e4503c52766e6ddfe9b1f42b012a3fe4d726 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -15,9 +15,9 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.AnalysisConfiguration; +import teetime.framework.ConfigurationContext; -public class ExceptionTestConfiguration extends AnalysisConfiguration { +public class ExceptionTestConfiguration extends ConfigurationContext { ExceptionTestProducerStage first; ExceptionTestConsumerStage second; @@ -31,7 +31,6 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); - this.addThreadableStage(first); this.addThreadableStage(second); this.addThreadableStage(third); } diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index b87d2ec385f2918c6e364531a649fe37c5ac3427..f4152c64ec89bfed0a176d0c9fa76b9e56591973 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -29,9 +29,9 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import teetime.framework.Analysis; -import teetime.framework.AnalysisConfiguration; -import teetime.framework.AnalysisException; +import teetime.framework.ConfigurationContext; +import teetime.framework.Execution; +import teetime.framework.ExecutionException; import teetime.util.Pair; /** @@ -113,16 +113,16 @@ public class InstanceOfFilterTest { @Test public void filterShouldSendToBothOutputPorts() throws Exception { InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig(); - Analysis analysis = new Analysis(config); + Execution execution = new Execution(config); try { - analysis.executeBlocking(); - } catch (AnalysisException e) { + execution.executeBlocking(); + } catch (ExecutionException e) { Collection<Pair<Thread, Throwable>> thrownExceptions = e.getThrownExceptions(); // TODO: handle exception } } - private static class InstanceOfFilterTestConfig extends AnalysisConfiguration { + private static class InstanceOfFilterTestConfig extends ConfigurationContext { public InstanceOfFilterTestConfig() { InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>(); @@ -134,7 +134,6 @@ public class InstanceOfFilterTest { connectPorts(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); connectPorts(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); - addThreadableStage(elementProducer); } }