diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 577f93a57a44804e0d6c0903e2e66e3553e8e560..61b694ea33eae506552220642f122d535ee1b6c2 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -5,65 +5,88 @@ <title>Release Notes</title> </properties> <body> - <release version="Snapshot" date="Daily basis" description="Unstable preview of oncoming versions"> + <release version="Snapshot" date="Daily basis" + description="Unstable preview of oncoming versions"> + <action dev="ntd" type="add" issue="33"> + TeeTime automatically + chooses the correct type of pipe for all connections. + </action> + <action dev="ntd" type="fix" issue="93"> + Introduced a new concept + for composing stages. + </action> + <action dev="ntd" type="remove"> + Marked Pair class as deprecated. + </action> <action dev="ntd" type="add" issue="154"> - All stages will be initialized before starting the analysis. + All stages will be + initialized before starting the analysis. </action> </release> - + <release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1"> <action dev="chw" due-to="Nils C. Ehmke" type="fix" issue="151"> Solved a bug in the merger stage. </action> </release> - + <release version="1.1.1" date="06.05.2015" description="Minor bugfixes for 1.1"> <action dev="ntd" due-to="Nils C. Ehmke" type="fix" issue="151"> Solved a bug which led to a NullPointerExceptions. </action> <action dev="ntd" type="update" issue="102"> - Removed deprecated methods. + Removed deprecated + methods. </action> - + </release> - + <release version="1.1" date="30.04.2015" description="Second release"> <action dev="ntd" type="add" issue="32"> - New concept: exception handling incl. Wiki tutorial. + New concept: exception + handling incl. Wiki tutorial. </action> <action due-to="Nils C. Ehmke" type="add" issue="107"> - New concept: unit test framework for testing a single stage. + New concept: + unit test framework for testing a single stage. </action> <action dev="chw" type="add"> - New class: AbstractTransformation; + New class: AbstractTransformation; Represents a stage with a single input and a single output port. </action> <action dev="chw" type="add"> New class: AbstractFilter; - Represents a stage with a single input and a single output port of the same type. + Represents a + stage with a single input and a single output port of the same type. </action> - - + + <action dev="ntd" type="update" issue="92"> - Analysis.start() is now deprecated. Use Analysis.execute() instead. + Analysis.start() is now + deprecated. Use Analysis.execute() instead. </action> <action due-to="Arne J. Salveter" type="update" issue="120"> - Renamed Stage.executeWithPorts() to Stage.executeStage(). + Renamed + Stage.executeWithPorts() to Stage.executeStage(). </action> <action dev="ntd" type="update" issue="112"> - Removed IterableProducer. Use InitialElementProducer instead. + Removed + IterableProducer. Use InitialElementProducer instead. </action> - - + + <action dev="chw" type="fix" issue="143"> - #143 Null values can block the analysis. + #143 Null values can block + the analysis. </action> <action dev="ntd" type="fix" issue="109"> - #109 Minor bug in ObjectProducer stage. + #109 Minor bug in + ObjectProducer stage. </action> <action dev="ntd" type="fix" issue="75"> - #75 Signal passing is incorrect. + #75 Signal passing is + incorrect. </action> @@ -71,10 +94,12 @@ Updated dependencies. </action> <action dev="ntd" type="update" issue="72"> - Jar is not only published via the Central Maven Repository, but also via our CI server Jenkins. + Jar is not only + published via the Central Maven Repository, but also via our CI + server Jenkins. </action> </release> - + <release version="1.0" date="19.12.2014" description="Initial release"> <action dev="ntd" type="add" issue="66"> Created a new site to diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 1167a87201072af56b9d7063970fcb00862fa35f..6623809dd09befc5ec8081bfd2a44342b9ff1488 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -15,141 +15,16 @@ */ package teetime.framework; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; -import teetime.framework.signal.ISignal; -import teetime.framework.validation.InvalidPortConnection; - /** * Represents a minimal stage that composes several other stages. * - * @since 1.1 + * @since 1.2 * @author Christian Wulf, Nelson Tavares de Sousa * - * @deprecated This concept is not yet implemented in a correct way. As soon as the concept is stable, we will remove the deprecated tag. * */ -@Deprecated -public abstract class AbstractCompositeStage extends Stage { - - private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE - .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - - private final Set<Stage> containingStages = new HashSet<Stage>(); - private final Set<Stage> lastStages = new HashSet<Stage>(); +public abstract class AbstractCompositeStage extends AnalysisConfiguration { protected abstract Stage getFirstStage(); - protected final Collection<? extends Stage> getLastStages() { - return lastStages; - } - - @Override - protected final void executeStage() { - getFirstStage().executeStage(); - } - - @Override - protected final void onSignal(final ISignal signal, final InputPort<?> inputPort) { - getFirstStage().onSignal(signal, inputPort); - } - - @Override - protected final TerminationStrategy getTerminationStrategy() { - return getFirstStage().getTerminationStrategy(); - } - - @Override - protected final void terminate() { - getFirstStage().terminate(); - } - - @Override - protected final boolean shouldBeTerminated() { - return getFirstStage().shouldBeTerminated(); - } - - @Override - protected final InputPort<?>[] getInputPorts() { - return getFirstStage().getInputPorts(); - } - - @Override - protected OutputPort<?>[] getOutputPorts() { - List<OutputPort<?>> outputPorts = new ArrayList<OutputPort<?>>(); - for (final Stage s : getLastStages()) { - outputPorts.addAll(Arrays.asList(s.getOutputPorts())); - } - return outputPorts.toArray(new OutputPort[0]); - } - - @Override - public final StageState getCurrentState() { - return getFirstStage().getCurrentState(); - } - - @Override - public final void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (final Stage s : getLastStages()) { - s.validateOutputPorts(invalidPortConnections); - } - } - - @Override - final void setOwningThread(final Thread owningThread) { - getFirstStage().setOwningThread(owningThread); - super.setOwningThread(owningThread); - } - - protected <T> void connectPorts(final OutputPort<? extends T> out, final InputPort<T> in) { - INTRA_PIPE_FACTORY.create(out, in); - containingStages.add(out.getOwningStage()); - containingStages.add(in.getOwningStage()); - } - - @Override - public final Thread getOwningThread() { - return getFirstStage().getOwningThread(); - } - - @Override - public final void onInitializing() throws Exception { - getFirstStage().onInitializing(); - } - - @Override - public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) { - getFirstStage().onValidating(invalidPortConnections); - } - - @Override - public final void onStarting() throws Exception { - for (Stage stage : containingStages) { - if (stage.getOutputPorts().length == 0) { - lastStages.add(stage); - break; - } - for (OutputPort<?> outputPort : stage.getOutputPorts()) { - if (!containingStages.contains(outputPort.getPipe().getTargetPort().getOwningStage())) { - lastStages.add(stage); - } - } - } - getFirstStage().onStarting(); - } - - @Override - public final void onTerminating() throws Exception { - getFirstStage().onTerminating(); - } - } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index c65916e440d9b045128d2ac9923a9571535048d1..50e4f96f2b963cb38de20df715375b45829d04ad 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -17,8 +17,10 @@ package teetime.framework; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -28,9 +30,14 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; +import teetime.util.Connection; import teetime.util.Pair; /** @@ -40,6 +47,8 @@ import teetime.util.Pair; * To start the analysis {@link #executeBlocking()} needs to be executed. * This class will automatically create threads and join them without any further commitment. * + * @author Christian Wulf, Nelson Tavares de Sousa + * * @param <T> * the type of the {@link AnalysisConfiguration} */ @@ -59,6 +68,13 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); + private boolean initialized; + + private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + private int createdConnections = 0; + private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); /** @@ -98,7 +114,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught // BETTER validate concurrently private void validateStages() { - final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); // } @@ -117,43 +133,16 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught */ private final void init() { - final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); + instantiatePipes(); + + final Set<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } for (Stage stage : threadableStageJobs) { - final Thread thread; - - final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); - switch (terminationStrategy) { - case BY_SIGNAL: { - final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - thread = createThread(runnable, stage.getId()); - this.consumerThreads.add(thread); - break; - } - case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - this.finiteProducerThreads.add(thread); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - break; - } - case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - producerRunnables.add(runnable); - thread = createThread(runnable, stage.getId()); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - this.infiniteProducerThreads.add(thread); - break; - } - default: - throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); - } + final Thread thread = initializeThreadableStages(stage); + final Set<Stage> intraStages = traverseIntraStages(stage); final AbstractExceptionListener newListener = factory.createInstance(); initializeIntraStages(intraStages, thread, newListener); @@ -167,6 +156,85 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } + private Thread initializeThreadableStages(final Stage stage) { + final Thread thread; + + final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); + switch (terminationStrategy) { + case BY_SIGNAL: { + final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); + thread = createThread(runnable, stage.getId()); + this.consumerThreads.add(thread); + break; + } + case BY_SELF_DECISION: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); + thread = createThread(runnable, stage.getId()); + this.finiteProducerThreads.add(thread); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + break; + } + case BY_INTERRUPT: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); + thread = createThread(runnable, stage.getId()); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + this.infiniteProducerThreads.add(thread); + break; + } + default: + throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); + } + return thread; + } + + private void instantiatePipes() { + Integer i = new Integer(0); + Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); + Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); + for (Stage threadableStage : threadableStageJobs) { + i++; + colors.put(threadableStage, i); // Markiere den threadHead + colorAndConnectStages(i, colors, threadableStage); + } + if (configuration.getConnections().size() != createdConnections) { + throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); + } + } + + public void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) { + Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); + for (Connection connection : configuration.getConnections()) { + if (connection.getSourcePort().getOwningStage() == threadableStage) { + Stage targetStage = connection.getTargetPort().getOwningStage(); + Integer targetColor = new Integer(0); + if (colors.containsKey(targetStage)) { + targetColor = colors.get(targetStage); + } + if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { + if (connection.getCapacity() != 0) { + interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity()); + } else { + interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4); + } + } else { + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(i)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); + colors.put(targetStage, i); + colorAndConnectStages(i, colors, targetStage); + } + createdConnections++; + } + } + } + private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 7ffc7235585ecbcdb24cedd78ad507553896d1d8..26139ce63fb552b01dd65c886c314ad3a4eaedd7 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -15,14 +15,15 @@ */ package teetime.framework; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.util.Connection; /** * Represents a configuration of connected stages, which is needed to run a analysis. @@ -30,8 +31,10 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; */ public abstract class AnalysisConfiguration { - private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); + private final Set<Stage> threadableStageJobs = new HashSet<Stage>(); + private final Set<Connection<?>> connections = new HashSet<Connection<?>>(); + @SuppressWarnings("deprecation") private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; /** @@ -47,7 +50,7 @@ public abstract class AnalysisConfiguration { */ private final static IPipeFactory interUnboundedThreadFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, true); - List<Stage> getThreadableStageJobs() { + Set<Stage> getThreadableStageJobs() { return this.threadableStageJobs; } @@ -61,6 +64,20 @@ public abstract class AnalysisConfiguration { this.threadableStageJobs.add(stage); } + /** + * Execute this method, to add a CompositeStage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary CompositeStage, which will be added to the configuration and executed in a thread. + */ + protected void addThreadableStage(final AbstractCompositeStage stage) { + this.threadableStageJobs.add(stage.getFirstStage()); + this.connections.addAll(stage.getConnections()); + for (Stage threadableStage : stage.getThreadableStageJobs()) { + this.addThreadableStage(threadableStage); + } + } + /** * Connects two stages with a pipe within the same thread. * @@ -68,7 +85,10 @@ public abstract class AnalysisConfiguration { * @param targetPort * @return * the pipe instance which connects the two given stages + * + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ + @Deprecated protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return intraThreadFactory.create(sourcePort, targetPort); } @@ -80,7 +100,10 @@ public abstract class AnalysisConfiguration { * @param targetPort * @return * the pipe instance which connects the two given stages + * + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ + @Deprecated protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return interBoundedThreadFactory.create(sourcePort, targetPort); } @@ -92,7 +115,10 @@ public abstract class AnalysisConfiguration { * @param targetPort * @return * the pipe instance which connects the two given stages + * + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ + @Deprecated protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { return interUnboundedThreadFactory.create(sourcePort, targetPort); } @@ -105,7 +131,10 @@ public abstract class AnalysisConfiguration { * @param capacity * capacity of the underlying queue * @return + * + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ + @Deprecated protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { return interBoundedThreadFactory.create(sourcePort, targetPort, capacity); } @@ -118,9 +147,47 @@ public abstract class AnalysisConfiguration { * @param capacity * capacity of the underlying queue * @return + * + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ + @Deprecated protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { return interUnboundedThreadFactory.create(sourcePort, targetPort, capacity); } + /** + * Connects two ports with a pipe. + * + * @param sourcePort + * port from the sending stage + * @param targetPort + * port from the receiving stage + */ + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + connectPorts(sourcePort, targetPort, 4); + } + + /** + * Connects to ports with a pipe of a certain capacity + * + * @param sourcePort + * port from the sending stage + * @param targetPort + * port from the receiving stage + * @param capacity + * the pipe is set to this capacity, if the value is greater than 0. If it is 0, than the pipe is unbounded, thus growing of the pipe is enabled. + */ + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + connections.add(new Connection<T>(sourcePort, targetPort, capacity)); + } + + /** + * Returns a list of pairs, which describe the connections among all stages. + * + * @return a list of pairs of Out- and InputPorts, which are connected + */ + protected Set<Connection<?>> getConnections() { + return connections; + } + } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index 4ece770476f6db94c3b217c611445600b1d4ddc4..59092010e96fdb08cf4e783d8aae1143eb317313 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -88,7 +88,7 @@ public final class StageTester { public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); - connectBoundedInterThreads(producer.getOutputPort(), inputHolder.getPort()); + connectPorts(producer.getOutputPort(), inputHolder.getPort()); addThreadableStage(producer); } @@ -96,7 +96,7 @@ public final class StageTester { for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); - connectIntraThreads(outputHolder.getPort(), sink.getInputPort()); + connectPorts(outputHolder.getPort(), sink.getInputPort()); } } } diff --git a/src/main/java/teetime/util/Connection.java b/src/main/java/teetime/util/Connection.java new file mode 100644 index 0000000000000000000000000000000000000000..c8e3740d632729986e0d0a718b82eec837c0cf43 --- /dev/null +++ b/src/main/java/teetime/util/Connection.java @@ -0,0 +1,72 @@ +package teetime.util; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; + +public class Connection<T> { + + private final OutputPort<? extends T> sourcePort; + private final InputPort<T> targetPort; + private final int capacity; + + public Connection(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + this(sourcePort, targetPort, 4); + } + + public Connection(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + this.sourcePort = sourcePort; + this.targetPort = targetPort; + this.capacity = capacity; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((sourcePort == null) ? 0 : sourcePort.hashCode()); + result = prime * result + ((targetPort == null) ? 0 : targetPort.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Connection<?> other = (Connection<?>) obj; + if (sourcePort == null) { + if (other.sourcePort != null) { + return false; + } + } else if (!sourcePort.equals(other.sourcePort)) { + return false; + } + if (targetPort == null) { + if (other.targetPort != null) { + return false; + } + } else if (!targetPort.equals(other.targetPort)) { + return false; + } + return true; + } + + public int getCapacity() { + return capacity; + } + + public OutputPort<? extends T> getSourcePort() { + return sourcePort; + } + + public InputPort<T> getTargetPort() { + return targetPort; + } + +} diff --git a/src/main/java/teetime/util/Pair.java b/src/main/java/teetime/util/Pair.java index 96ea6a61df295677dec8ce450d7cd16da5643dbb..dda9883445c63c766e7e3b639bbeae16996c88c0 100644 --- a/src/main/java/teetime/util/Pair.java +++ b/src/main/java/teetime/util/Pair.java @@ -15,6 +15,15 @@ */ package teetime.util; +/** + * + * @param <F> + * @param <S> + * + * @deprecated since 1.2 + */ +@Deprecated +// See http://stackoverflow.com/questions/156275/what-is-the-equivalent-of-the-c-pairl-r-in-java public final class Pair<F, S> { private final F first; diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index fca97df2eca0b734b77004a5927bb8738f4dfff9..95ba33f4357662d7014573bfe67f6f5d810582f7 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -40,12 +40,12 @@ public class CipherConfiguration extends AnalysisConfiguration { final CipherStage decrypt = new CipherStage(password, CipherMode.DECRYPT); final ByteArrayFileWriter writer = new ByteArrayFileWriter(output); - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), enc.getInputPort()); - connectIntraThreads(enc.getOutputPort(), comp.getInputPort()); - connectIntraThreads(comp.getOutputPort(), decomp.getInputPort()); - connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); - connectIntraThreads(decrypt.getOutputPort(), writer.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), enc.getInputPort()); + connectPorts(enc.getOutputPort(), comp.getInputPort()); + connectPorts(comp.getOutputPort(), decomp.getInputPort()); + connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); + connectPorts(decrypt.getOutputPort(), writer.getInputPort()); // this.getFiniteProducerStages().add(init); this.addThreadableStage(init); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index e6b0f9d7e4e26f34295dd3ebe7a09e7f11020927..15c325dad0a5cca3f7df333d092a8444ec16af55 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -43,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration { final Tokenizer tokenizer = new Tokenizer(" "); this.counter = new Counter<String>(); - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort()); - connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); - connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort()); - connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort()); - connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), decomp.getInputPort()); + connectPorts(decomp.getOutputPort(), decrypt.getInputPort()); + connectPorts(decrypt.getOutputPort(), b2s.getInputPort()); + connectPorts(b2s.getOutputPort(), tokenizer.getInputPort()); + connectPorts(tokenizer.getOutputPort(), this.counter.getInputPort()); this.addThreadableStage(init); } diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index f9905c60a8fac7d41cb99320757d98a05dc6f8fa..5c347dfe763eaa8627db1930e973dcf26eac1594 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -18,14 +18,19 @@ package teetime.framework; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import teetime.stage.InitialElementProducer; +import teetime.stage.InstanceOfFilter; +import teetime.stage.basic.Sink; import teetime.util.StopWatch; public class AnalysisTest { @@ -71,7 +76,7 @@ public class AnalysisTest { public TestConfig() { final InitialElementProducer<String> init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(DELAY_IN_MS); - connectIntraThreads(init.getOutputPort(), delay.getInputPort()); + connectPorts(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } @@ -98,4 +103,51 @@ public class AnalysisTest { } + @Test + public void testInstantiatePipes() throws Exception { + Analysis<AnalysisTestConfig> interAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(true)); + assertThat(interAnalysis.getConfiguration().init.getOwningThread(), is(not(interAnalysis.getConfiguration().sink.getOwningThread()))); + + Analysis<AnalysisTestConfig> intraAnalysis = new Analysis<AnalysisTestConfig>(new AnalysisTestConfig(false)); + assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread())); + } + + private class AnalysisTestConfig extends AnalysisConfiguration { + public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); + public Sink<Object> sink = new Sink<Object>(); + + public AnalysisTestConfig(final boolean inter) { + connectPorts(init.getOutputPort(), sink.getInputPort()); + addThreadableStage(init); + if (inter) { + addThreadableStage(sink); + } + } + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInstantiatePipesIncorrectConfiguration() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Crossing threads"); + InvalidTestConfig configuration = new InvalidTestConfig(); + new Analysis<InvalidTestConfig>(configuration); + } + + private class InvalidTestConfig extends AnalysisConfiguration { + public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); + public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); + public Sink<Object> sink = new Sink<Object>(); + + public InvalidTestConfig() { + connectPorts(init.getOutputPort(), iof.getInputPort()); + connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); + connectPorts(init.createOutputPort(), sink.createInputPort()); + addThreadableStage(init); + addThreadableStage(iof); + } + } + } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 892c3cd0de01cd336d150f66e418535b2d439ed1..74d0cd73531a604c7170771fb699788ae80cbde0 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -35,6 +35,7 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); + // Can not use createPorts, as the if condition above will lead to an exception connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/RunnableProducerStageTest.java b/src/test/java/teetime/framework/RunnableProducerStageTest.java index 387dd622513260e9ea08a0df5b21e7b06064513c..4f2120ac2207c57286c23b61e0d91a09b5b296b6 100644 --- a/src/test/java/teetime/framework/RunnableProducerStageTest.java +++ b/src/test/java/teetime/framework/RunnableProducerStageTest.java @@ -19,8 +19,7 @@ public class RunnableProducerStageTest { // Not running, but initialized assertFalse(testStage.executed && !testStage.initialized); runnable.triggerStartingSignal(); - - while (!testStage.shouldBeTerminated()) { + while (!(testStage.getCurrentState() == StageState.TERMINATED)) { Thread.yield(); } assertTrue(testStage.executed); diff --git a/src/test/java/teetime/framework/RunnableTestStage.java b/src/test/java/teetime/framework/RunnableTestStage.java index 034474a3a1ef81503f49f20d9d19f5a1aed7110c..47f4cd0c31f542a2e8b40a9626cb001b47ec4c6f 100644 --- a/src/test/java/teetime/framework/RunnableTestStage.java +++ b/src/test/java/teetime/framework/RunnableTestStage.java @@ -1,6 +1,5 @@ package teetime.framework; - class RunnableTestStage extends AbstractProducerStage<Object> { boolean executed, initialized; diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 5fcc1dfee8718ee9335dab311cb93bb363a3cc84..19580dcb72e9573163c2c40755d4f18a45b60e4c 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -60,7 +60,7 @@ public class StageTest { public TestConfig() { init = new InitialElementProducer<String>("Hello"); delay = new DelayAndTerminate(0); - connectIntraThreads(init.getOutputPort(), delay.getInputPort()); + connectPorts(init.getOutputPort(), delay.getInputPort()); addThreadableStage(init); } } diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 35a7826b5800186a72416d97e18a58f8662292b9..f895104874d57a005aa66e0fa6788c228b36747f 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -15,7 +15,10 @@ */ package teetime.framework; -import static org.junit.Assert.assertTrue; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import java.io.File; import java.util.HashSet; @@ -23,7 +26,6 @@ import java.util.Set; import org.junit.Test; -import teetime.framework.pipe.IPipe; import teetime.stage.CountingMapMerger; import teetime.stage.InitialElementProducer; import teetime.stage.basic.distributor.Distributor; @@ -40,12 +42,14 @@ public class TraversorTest { @Test public void traverse() { TestConfiguration tc = new TestConfiguration(); + new Analysis<TestConfiguration>(tc); traversor.traverse(tc.init); Set<Stage> comparingStages = new HashSet<Stage>(); comparingStages.add(tc.init); comparingStages.add(tc.f2b); comparingStages.add(tc.distributor); - assertTrue(comparingStages.equals(traversor.getVisitedStage())); + assertThat(tc.distributor.getOwningThread(), is(not(tc.distributor.getOutputPorts()[0].pipe.getTargetPort().getOwningStage().getOwningThread()))); + assertEquals(comparingStages, traversor.getVisitedStage()); } // WordCounterConfiguration @@ -67,8 +71,8 @@ public class TraversorTest { // CountingMapMerger (already as field) // Connecting the stages of the first part of the config - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), distributor.getInputPort()); + connectPorts(init.getOutputPort(), f2b.getInputPort()); + connectPorts(f2b.getOutputPort(), distributor.getInputPort()); // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { @@ -76,15 +80,15 @@ public class TraversorTest { final WordCounter wc = new WordCounter(); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); - final IPipe distributorPipe = connectBoundedInterThreads(distributor.getNewOutputPort(), wc.getInputPort(), 10000); - final IPipe mergerPipe = connectBoundedInterThreads(wc.getOutputPort(), merger.getNewInputPort()); + connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); + connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread addThreadableStage(wc); } // Connect the stages of the last part - connectIntraThreads(merger.getOutputPort(), result.getInputPort()); + connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages addThreadableStage(init); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 4d4dff162c62b5b719effb511beceff0148f32a1..191f8460e6f68a04007a05f7cc29338e3c2789f2 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -42,7 +42,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { Clock clock = new Clock(); clock.setInitialDelayInMs(initialDelayInMs); - connectBoundedInterThreads(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + connectPorts(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); return clock; } @@ -51,7 +51,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); delay = new Delay<Object>(); - connectIntraThreads(initialElementProducer.getOutputPort(), delay.getInputPort()); + connectPorts(initialElementProducer.getOutputPort(), delay.getInputPort()); return initialElementProducer; } @@ -62,8 +62,8 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new WaitStrategy(relay)); - connectBoundedInterThreads(delay.getOutputPort(), relay.getInputPort()); - connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); + connectPorts(delay.getOutputPort(), relay.getInputPort()); + connectPorts(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index f3e48a4d32c21c1e8bf1c01087657593e4d54099..a63025947a38b2db8f3e12e3bad6c1637d52fa8a 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -44,8 +44,8 @@ class YieldStrategyConfiguration extends AnalysisConfiguration { // relay.setIdleStrategy(new YieldStrategy()); - connectBoundedInterThreads(producer.getOutputPort(), relay.getInputPort()); - connectIntraThreads(relay.getOutputPort(), collectorSink.getInputPort()); + connectPorts(producer.getOutputPort(), relay.getInputPort()); + connectPorts(relay.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index a9a1d1dfbbea2638d1c5886566426c250f2f1a0e..d985faf23a7c09a0eb2ee3e1727c289047a3f209 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -28,7 +28,7 @@ public class ExceptionTestConfiguration extends AnalysisConfiguration { second = new ExceptionTestConsumerStage(); third = new ExceptionTestProducerStage(); - connectBoundedInterThreads(first.getOutputPort(), second.getInputPort()); + connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); this.addThreadableStage(first); diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index 8abd3018b5122f409e023e936f30dd6a1377892a..b87d2ec385f2918c6e364531a649fe37c5ac3427 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -130,9 +130,9 @@ public class InstanceOfFilterTest { CollectorSink<Clazz> clazzCollector = new CollectorSink<Clazz>(); CollectorSink<Object> mismatchedCollector = new CollectorSink<Object>(); - connectIntraThreads(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); - connectIntraThreads(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); - connectIntraThreads(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); + connectPorts(elementProducer.getOutputPort(), instanceOfFilter.getInputPort()); + connectPorts(instanceOfFilter.getMatchedOutputPort(), clazzCollector.getInputPort()); + connectPorts(instanceOfFilter.getMismatchedOutputPort(), mismatchedCollector.getInputPort()); addThreadableStage(elementProducer); }