diff --git a/conf/quality-config/pmd-ruleset.xml b/conf/quality-config/pmd-ruleset.xml index ebaa4801325daf8dbfebcfeebc764c0eb159efd5..4f0d9fb149924af25f77ff4704104129ec3c897e 100644 --- a/conf/quality-config/pmd-ruleset.xml +++ b/conf/quality-config/pmd-ruleset.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pmd.sourceforge.net/ruleset/2.0.0 http://pmd.sourceforge.net/ruleset_2_0_0.xsd"> <description>This ruleset checks my code for bad stuff</description> - + <exclude-pattern>.*/target/.*</exclude-pattern> <!-- warning: if you reference a pmd rulesets xml file more than once, only the first rule is applied; so check for duplicates --> diff --git a/pom.xml b/pom.xml index e40e11d5600f3342b84e90c083e40ffc8d27a63e..48829231fb3503479dc2f87eee23b1273786bad4 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <java.version>1.6</java.version> <checkstyle.version>2.16</checkstyle.version> - <findbugs.version>3.0.1</findbugs.version> + <findbugs.version>3.0.2</findbugs.version> <pmd.version>3.5</pmd.version> <javadoc.version>2.10.3</javadoc.version> </properties> @@ -190,6 +190,25 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>${javadoc.version}</version> + <configuration> + <tags> + <tag> + <name>stage.input</name> + <placement>t</placement> + <head>Stage input:</head> + </tag> + <tag> + <name>stage.output</name> + <placement>t</placement> + <head>Stage output:</head> + </tag> + <tag> + <name>stage.sketch</name> + <placement>t</placement> + <head>Stage sketch:</head> + </tag> + </tags> + </configuration> <executions> <execution> <id>attach-javadocs</id> @@ -203,7 +222,7 @@ <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.5</version> + <version>1.6.6</version> <extensions>true</extensions> <configuration> <serverId>teetime-deployment</serverId> @@ -358,7 +377,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-project-info-reports-plugin</artifactId> - <version>2.8</version> + <version>2.8.1</version> </plugin> </plugins> </build> @@ -414,6 +433,23 @@ <version>${javadoc.version}</version> <configuration> <destDir>${javadocOutputDir}</destDir> + <tags> + <tag> + <name>stage.input</name> + <placement>t</placement> + <head>Stage input:</head> + </tag> + <tag> + <name>stage.output</name> + <placement>t</placement> + <head>Stage output:</head> + </tag> + <tag> + <name>stage.sketch</name> + <placement>t</placement> + <head>Stage sketch:</head> + </tag> + </tags> </configuration> </plugin> </plugins> diff --git a/src/main/java/teetime/framework/A1ThreadableStageCollector.java b/src/main/java/teetime/framework/A1ThreadableStageCollector.java index 0b34c30bdb024cde2a78756d2dd4524514736cd5..387ad88f18ec95014136bfa6aedf145d1e11aadf 100644 --- a/src/main/java/teetime/framework/A1ThreadableStageCollector.java +++ b/src/main/java/teetime/framework/A1ThreadableStageCollector.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.Set; import teetime.framework.Traverser.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; /** * Searches for threadable stages @@ -44,4 +45,10 @@ class A1ThreadableStageCollector implements ITraverserVisitor { return VisitorBehavior.CONTINUE; } + @Override + public void visit(final DummyPipe pipe, final AbstractPort<?> port) { + // TODO Auto-generated method stub + + } + } diff --git a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java index 043c31ee365f61ce0fd2927f51b55c393e71c2f7..fd4712307644ecd9e4305f31f3b491961e9ad925 100644 --- a/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java +++ b/src/main/java/teetime/framework/A2InvalidThreadAssignmentCheck.java @@ -21,6 +21,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap; import com.carrotsearch.hppc.ObjectIntMap; import teetime.framework.Traverser.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; /** @@ -90,5 +91,11 @@ public class A2InvalidThreadAssignmentCheck { return VisitorBehavior.STOP; } + @Override + public void visit(final DummyPipe pipe, final AbstractPort<?> port) { + // TODO Auto-generated method stub + + } + } } diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index e5b6ea062f46983efc659108b24238bc17b2ca82..72afb486ed82d9dc7d9bcaa8961aa6ded9d7a581 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import teetime.framework.Traverser.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; @@ -90,4 +91,12 @@ class A3PipeInstantiation implements ITraverserVisitor { } } + + @Override + public void visit(final DummyPipe pipe, final AbstractPort<?> port) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Unconnected port " + port + " in stage " + port.getOwningStage().getId()); + } + } + } diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index ad1fc8ceaf4a3ba94a4dbca71a9305bd942d3868..4a835710077113e90334e61b0991e7f1dad0b7dc 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -32,30 +32,6 @@ public abstract class AbstractCompositeStage { */ private static final int DEFAULT_CAPACITY = 4; - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - */ - protected final void addThreadableStage(final Stage stage) { - this.addThreadableStage(stage, stage.getId()); - } - - /** - * Execute this method, to add a stage to the configuration, which should be executed in a own thread. - * - * @param stage - * A arbitrary stage, which will be added to the configuration and executed in a thread. - * @param threadName - * A string which can be used for debugging. - */ - protected void addThreadableStage(final Stage stage, final String threadName) { - AbstractRunnableStage runnable = AbstractRunnableStage.create(stage); - Thread newThread = new TeeTimeThread(runnable, threadName); - stage.setOwningThread(newThread); - } - /** * Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}. * @@ -85,7 +61,7 @@ public abstract class AbstractCompositeStage { protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().size() == 0) { if (sourcePort.getOwningStage().getOwningThread() == null) { - addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId()); + sourcePort.getOwningStage().declareActive(); } } diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 6496217823a2f07bbff3fe2f16c83472b803db13..447b243c8a0d8d1fd709304281de9d62e56478e3 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -15,8 +15,10 @@ */ package teetime.framework; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import teetime.framework.pipe.IPipe; @@ -27,6 +29,7 @@ import teetime.util.framework.port.PortRemovedListener; public abstract class AbstractStage extends Stage { + private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); @@ -54,7 +57,21 @@ public abstract class AbstractStage extends Stage { @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - if (!this.signalAlreadyReceived(signal, inputPort)) { + Class<? extends ISignal> signalClass = signal.getClass(); + + Set<InputPort<?>> signalReceivedInputPorts; + if (signalMap.containsKey(signalClass)) { + signalReceivedInputPorts = signalMap.get(signalClass); + } else { + signalReceivedInputPorts = new HashSet<InputPort<?>>(); + signalMap.put(signalClass, signalReceivedInputPorts); + } + + if (!signalReceivedInputPorts.add(inputPort)) { + this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); + return; + } + if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) { try { signal.trigger(this); } catch (Exception e) { diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 4102d7fcb5f2e8df890a515ff89d7e3d7e5ecdee..de6ba150e10440c8800b09b368b0e3ce3f138da0 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -31,6 +31,7 @@ public abstract class Configuration extends AbstractCompositeStage { private final AbstractExceptionListenerFactory<?> factory; private final ConfigurationContext context; + private boolean initialized; private boolean executed; private Stage startStage; @@ -43,11 +44,19 @@ public abstract class Configuration extends AbstractCompositeStage { this.context = new ConfigurationContext(this); } - boolean isExecuted() { + boolean isInitialized() { + return initialized; + } + + void setInitialized(final boolean executed) { + this.initialized = executed; + } + + public boolean isExecuted() { return executed; } - void setExecuted(final boolean executed) { + public void setExecuted(final boolean executed) { this.executed = executed; } @@ -55,10 +64,14 @@ public abstract class Configuration extends AbstractCompositeStage { return factory; } - @Override - protected void addThreadableStage(final Stage stage, final String threadName) { - startStage = stage; // memorize an arbitrary stage as starting point for traversing - super.addThreadableStage(stage, threadName); + /** + * Register pipes if your configuration only relies on custom pipes and therefore {@link #connectPorts(OutputPort, InputPort)} is never called. + * + * @param pipe + * A custom pipe instance + */ + protected void registerCustomPipe(final AbstractPipe<?> pipe) { + startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing } @Override diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 8c356665d14b5f23352fc9f5eb1369d8f1747dde..054821bbbfb075615425c3baddc183e5d49e4ce9 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -19,7 +19,7 @@ import java.util.Set; /** * Represents a context that is used by a configuration and composite stages to connect ports, for example. - * Stages can be added by executing {@link #addThreadableStage(Stage)}. + * Stages can be added by executing {@link #declareActive(Stage)}. * * @since 2.0 */ diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 441791eeef72d7cd7fe0df7ca19d02c12bf8e239..a8c98167c67bbf475a3957a15147aecd55b678b3 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -64,10 +64,10 @@ public final class Execution<T extends Configuration> { public Execution(final T configuration, final boolean validationEnabled) { this.configuration = configuration; this.configurationContext = configuration.getContext(); - if (configuration.isExecuted()) { + if (configuration.isInitialized()) { throw new IllegalStateException("Configuration was already executed"); } - configuration.setExecuted(true); + configuration.setInitialized(true); if (validationEnabled) { validateStages(); } @@ -142,6 +142,10 @@ public final class Execution<T extends Configuration> { * @since 2.0 */ public void executeNonBlocking() { + if (configuration.isExecuted()) { + throw new IllegalStateException("Any configuration instance may only be executed once."); + } + configuration.setExecuted(true); configurationContext.executeConfiguration(); } diff --git a/src/main/java/teetime/framework/ITraverserVisitor.java b/src/main/java/teetime/framework/ITraverserVisitor.java index 5cd376f07062437affec96bae3b31c95f777af2e..f089a258104be16050a059274784aab52ffea936 100644 --- a/src/main/java/teetime/framework/ITraverserVisitor.java +++ b/src/main/java/teetime/framework/ITraverserVisitor.java @@ -16,6 +16,7 @@ package teetime.framework; import teetime.framework.Traverser.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; public interface ITraverserVisitor { @@ -23,4 +24,6 @@ public interface ITraverserVisitor { VisitorBehavior visit(AbstractPort<?> port); + void visit(DummyPipe pipe, AbstractPort<?> port); + } diff --git a/src/main/java/teetime/framework/IntraStageCollector.java b/src/main/java/teetime/framework/IntraStageCollector.java index 40ccaa5bba2919c9feb550f0bb5b545c0b6b9d52..3e95ba4daab30808d950bbe251e0b7bc63376caf 100644 --- a/src/main/java/teetime/framework/IntraStageCollector.java +++ b/src/main/java/teetime/framework/IntraStageCollector.java @@ -16,6 +16,7 @@ package teetime.framework; import teetime.framework.Traverser.VisitorBehavior; +import teetime.framework.pipe.DummyPipe; public class IntraStageCollector implements ITraverserVisitor { @@ -40,4 +41,10 @@ public class IntraStageCollector implements ITraverserVisitor { return VisitorBehavior.CONTINUE; } + @Override + public void visit(final DummyPipe pipe, final AbstractPort<?> port) { + // TODO Auto-generated method stub + + } + } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index a768ac37def7977b929a1ecf31cb6d2371f71830..4851210d210ef264edcdce57a111ce091377db97 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -50,6 +50,8 @@ public abstract class Stage { /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ private Thread owningThread; + private boolean isActive; + private ConfigurationContext owningContext; ConfigurationContext getOwningContext() { @@ -181,4 +183,37 @@ public abstract class Stage { protected abstract void removeDynamicPort(InputPort<?> inputPort); + public boolean isActive() { + return isActive; + } + + void setActive(final boolean isActive) { + this.isActive = isActive; + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + */ + public void declareActive() { + declareActive(getId()); + } + + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration and executed in a thread. + * @param threadName + * A string which can be used for debugging. + */ + public void declareActive(final String threadName) { + AbstractRunnableStage runnable = AbstractRunnableStage.create(this); + Thread newThread = new TeeTimeThread(runnable, threadName); + this.setOwningThread(newThread); + this.setActive(true); + } + } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 4528ed5c90e621ce93fc0a038b46b36add26eafe..c7656c57b43ede82997624b522d02105869b0fcd 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> { } void startStageAtRuntime(final Stage newStage) { - configuration.addThreadableStage(newStage); + newStage.declareActive(); Set<Stage> newThreadableStages = initialize(newStage); startThreads(newThreadableStages); diff --git a/src/main/java/teetime/framework/Traverser.java b/src/main/java/teetime/framework/Traverser.java index 7562b4c5f20ed58bccfa7bdb622b9f0d94e6c72d..6bb62c647a640fa40cea23a800c1378d448538d7 100644 --- a/src/main/java/teetime/framework/Traverser.java +++ b/src/main/java/teetime/framework/Traverser.java @@ -82,6 +82,7 @@ public class Traverser { private void visitAndTraverse(final AbstractPort<?> port, final Direction direction) { if (port.getPipe() instanceof DummyPipe) { + traverserVisitor.visit((DummyPipe) port.getPipe(), port); return; } VisitorBehavior behavior = traverserVisitor.visit(port); diff --git a/src/main/java/teetime/framework/pipe/CouldNotFindPipeImplException.java b/src/main/java/teetime/framework/pipe/CouldNotFindPipeImplException.java deleted file mode 100644 index bb7a50585675a8dc318ebfff3e6ab3980b8db0aa..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/CouldNotFindPipeImplException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -public final class CouldNotFindPipeImplException extends RuntimeException { - - private static final long serialVersionUID = 5242260988104493402L; - - public CouldNotFindPipeImplException(final String key) { - super("Could not find any pipe implementation that conforms to the key: " + key); - } - -} diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java deleted file mode 100644 index 34f1d57dbf0f7f25f599888c5c8d4c5df9662d7c..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.AbstractInterThreadPipe; -import teetime.util.ConstructorClosure; - -final class RelayTestPipe<T> extends AbstractInterThreadPipe<T> { - - private int numInputObjects; - private final ConstructorClosure<T> inputObjectCreator; - - public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) { - super(null, null, Integer.MAX_VALUE); - this.numInputObjects = numInputObjects; - this.inputObjectCreator = inputObjectCreator; - } - - @Override - public boolean add(final Object element) { - return false; - } - - @Override - public boolean addNonBlocking(final Object element) { - return add(element); - } - - @Override - public T removeLast() { - if (this.numInputObjects == 0) { - return null; - } else { - this.numInputObjects--; - return this.inputObjectCreator.create(); - } - } - - @Override - public boolean isEmpty() { - return (this.numInputObjects == 0); - } - - @Override - public int size() { - return this.numInputObjects; - } - -} diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index f8e001592ba6250696beef8201c61fd8c64307da..38ad2ce4d4011c0f36adfe7ee583e1fd3932f324 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -30,7 +30,7 @@ public final class StartingSignal implements ISignal { @Override public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return true; + return receivedInputPorts.size() == 1; } } diff --git a/src/main/java/teetime/framework/signal/TerminatingSignal.java b/src/main/java/teetime/framework/signal/TerminatingSignal.java index f17b532a66bb26b634f5de32d35707e5a2249a32..211c57bb42fc7ad6deea6f5e5ad2818dbdbd906f 100644 --- a/src/main/java/teetime/framework/signal/TerminatingSignal.java +++ b/src/main/java/teetime/framework/signal/TerminatingSignal.java @@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal { @Override public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) { - return receivedInputPorts.size() == allInputPorts.size(); + return receivedInputPorts.size() >= allInputPorts.size(); } } diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index fd014645078cecde0955a7c9fb6dac655aa37845..10313c679335d1f9b187fab647e69c6b2f54037f 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -91,7 +91,7 @@ public final class StageTester { connectPorts(producer.getOutputPort(), inputHolder.getPort()); } - addThreadableStage(stage); + stage.declareActive(); for (OutputHolder<?> outputHolder : outputHolders) { final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements()); diff --git a/src/main/java/teetime/stage/Clock.java b/src/main/java/teetime/stage/Clock.java index 19a1dae1b818925df80277b2e9609f80aecd5a86..fd5713342d003e2616a16b764f029af8379a3d8f 100644 --- a/src/main/java/teetime/stage/Clock.java +++ b/src/main/java/teetime/stage/Clock.java @@ -18,11 +18,37 @@ package teetime.stage; import teetime.framework.AbstractProducerStage; import teetime.framework.TerminationStrategy; +/** + * This stage sends the current timestamp repeatedly with a given interval delay of {@link #intervalDelayInMs}. + * + * @stage.sketch + * + * <pre> + * +------------------------+ + * | | + * | +---+ + * | *INTERVAL* +--> | | + * | +---+ + * | | + * +------------------------+ + * </pre> + * + * @author Nelson Tavares de Sousa + * + * @stage.output Current timestamp as long. + * + */ public final class Clock extends AbstractProducerStage<Long> { private boolean initialDelayExceeded = false; + /** + * Waiting time span until first sent element. + */ private long initialDelayInMs; + /** + * Interval between two sent elements in ms. + */ private long intervalDelayInMs; @Override diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index fe7aee22e637c533df52d90f2ee7c0078a2dcf7e..64e175d0dea46315797223ca78b9caab50207110 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -23,12 +23,36 @@ import teetime.stage.basic.distributor.strategy.IDistributorStrategy; import teetime.stage.basic.distributor.strategy.RoundRobinStrategy2; /** - * @author Christian Wulf + * New output ports can be created by calling {@link #getNewOutputPort()}. + * + * @stage.sketch + * + * <pre> + * +----------------------------+ + * | | + * | +---+ + * | +------------> | | + * | | +---+ + * | | | + * +---+ | + * | | +-------+--- . . . . + * +---+ | + * | | | + * | | +---+ + * | +------------> | | + * | +---+ + * | | + * +----------------------------+ + * </pre> + * + * @stage.output The incoming element will be passed to output ports, which are selected by a strategy. + * + * @author Christian Wulf, Nelson Tavares de Sousa * * @since 1.0 * - * @param T - * the type of the input port and the output ports + * @param <T> + * the type of both the input and output ports */ public class Distributor<T> extends AbstractConsumerStage<T> { diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 89bc76226e56997b30506d46edb82d704d58fe7f..9f6ca96b0009a5bbae9f15857474d9b70790830e 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -15,34 +15,50 @@ */ package teetime.stage.basic.merger; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.signal.ISignal; import teetime.stage.basic.merger.strategy.IMergerStrategy; import teetime.stage.basic.merger.strategy.RoundRobinStrategy; /** * * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. - * For its signal handling behavior see {@link #onSignal(ISignal, InputPort)} + * New input ports can be created by calling {@link #getNewInputPort()}. * - * @author Christian Wulf, Nelson Tavares de Sousa + * @stage.sketch + * + * <pre> + * +---------------------------+ + * | | + * +---+ | + * | | +-----------+ | + * +---+ | | + * | | | + * | +---+ + * . . . . ---+-------> | | + * | +---+ + * | | | + * +---+ | | + * | | +-----------+ | + * +---+ | + * | | + * +---------------------------+ + * + * + * </pre> + * + * @author Christian Wulf * * @since 1.0 * - * @param <T> - * the type of both the input and output ports + * @param T + * the type of the input port and the output ports */ public class Merger<T> extends AbstractStage { - private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap; private final OutputPort<T> outputPort = this.createOutputPort(); private final IMergerStrategy strategy; @@ -52,7 +68,6 @@ public class Merger<T> extends AbstractStage { } public Merger(final IMergerStrategy strategy) { - this.signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); this.strategy = strategy; addInputPortRemovedListener(strategy); } @@ -66,42 +81,6 @@ public class Merger<T> extends AbstractStage { outputPort.send(token); } - /** - * This method is executed, if a signal is sent to a instance of this class. - * Multiple signals of one certain type are ignored, if they are sent to same port. - * Hence a signal is only passed on, when it arrived on all input ports, regardless how often. - * - * @param signal - * Signal which is sent - * - * @param inputPort - * The port which the signal was sent to - */ - @Override - public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - if (logger.isTraceEnabled()) { - this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); - } - - Class<? extends ISignal> signalClass = signal.getClass(); - - Set<InputPort<?>> signalReceivedInputPorts; - if (signalMap.containsKey(signalClass)) { - signalReceivedInputPorts = signalMap.get(signalClass); - } else { - signalReceivedInputPorts = new HashSet<InputPort<?>>(); - signalMap.put(signalClass, signalReceivedInputPorts); - } - - if (!signalReceivedInputPorts.add(inputPort)) { - this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); - } - - if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) { - super.onSignal(signal, inputPort); - } - } - public IMergerStrategy getMergerStrategy() { return this.strategy; } diff --git a/src/main/resources/migrate-keywords-to-teetime-2.0.sh b/src/main/resources/migrate-keywords-to-teetime-2.0.sh new file mode 100644 index 0000000000000000000000000000000000000000..3ed942af83880a0ed02860221b69e9eab40e50c4 --- /dev/null +++ b/src/main/resources/migrate-keywords-to-teetime-2.0.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +function replace(){ +echo $1 +sed -i -- '/onInitializing/d' $1 +sed -i -- 's:connectInterThreads:connectPorts:' $1 +sed -i -- 's:connectIntraThreads:connectPorts:' $1 +sed -i -- 's:AnalysisConfiguration.connectPorts:connectPorts:' $1 +sed -i -- 's:addThreadableStage:declareActive:' $1 +sed -i -- 's: AnalysisConfiguration: Configuration:' $1 +sed -i -- 's: AnalysisConfiguration: Configuration:' $1 +sed -i -- 's:import teetime.framework.AnalysisConfiguration:import teetime.framework.Configuration:' $1 +sed -i -- 's: Analysis: Execution:' $1 +sed -i -- 's: Analysis: Execution:' $1 +sed -i -- 's:import teetime.framework.Analysis:import teetime.framework.Execution:' $1 +sed -i -- 's:import teetime.stage.basic.distributor.CopyByReferenceStrategy:import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy:' $1 + +} + +echo "Migrating to TeeTime 2.0" + +export -f replace +find $1 -type f -name *.java -exec bash -c 'replace "$0"' {} \; + +echo "Done" \ No newline at end of file diff --git a/src/site/markdown/contribute.markdown b/src/site/markdown/contribute.md similarity index 100% rename from src/site/markdown/contribute.markdown rename to src/site/markdown/contribute.md diff --git a/src/site/markdown/download.markdown b/src/site/markdown/download.md similarity index 100% rename from src/site/markdown/download.markdown rename to src/site/markdown/download.md diff --git a/src/site/markdown/index.markdown b/src/site/markdown/index.md similarity index 97% rename from src/site/markdown/index.markdown rename to src/site/markdown/index.md index 8840c20d81608b4a855650e76ac8c6b983e7d3ff..947f753feee88ae44d3771b80a2b6b0a739a471f 100644 --- a/src/site/markdown/index.markdown +++ b/src/site/markdown/index.md @@ -58,7 +58,7 @@ To make working with TeeTime as pleasant and efficient as possible, this homepag You can take a closer look at: -- [TeeTime's Wiki](wiki/), or +- [TeeTime's Wiki](wiki/home.html), or - [API Documentation](stabledocs/index.html). diff --git a/src/site/markdown/logo.markdown b/src/site/markdown/logo.md similarity index 100% rename from src/site/markdown/logo.markdown rename to src/site/markdown/logo.md diff --git a/src/site/markdown/news.markdown b/src/site/markdown/news.md similarity index 100% rename from src/site/markdown/news.markdown rename to src/site/markdown/news.md diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki new file mode 160000 index 0000000000000000000000000000000000000000..709c839c447a50c93b37fcc633a01297115d4823 --- /dev/null +++ b/src/site/markdown/wiki @@ -0,0 +1 @@ +Subproject commit 709c839c447a50c93b37fcc633a01297115d4823 diff --git a/src/site/resources/css/site.css b/src/site/resources/css/site.css index d51bc425d279c3a36720ea6cd8222ec006a5060d..613683d780862a3c8c0465f91841eb0226aba593 100644 --- a/src/site/resources/css/site.css +++ b/src/site/resources/css/site.css @@ -1,5 +1,3 @@ -@import url(http://fonts.googleapis.com/css?family=Pinyon+Script); - body { background-image:url(../images/bg.png); } @@ -11,4 +9,8 @@ body { .slogan { color: #7E7E7E; margin-top: 0.5em; -} \ No newline at end of file +} + +.dropdown-backdrop{ +position: static; +} diff --git a/src/site/site.xml b/src/site/site.xml index 534be27fbd7a2f05e3909b524d86619b32c4defc..1126f4c14b611de7fe9dc1b52d65428db0943bb7 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -4,7 +4,6 @@ <artifactId>reflow-maven-skin</artifactId> <version>1.1.1</version> </skin> - <bannerLeft> <name> <![CDATA[ @@ -51,17 +50,18 @@ <links> <item name="News" href="news.html" /> <item name="Download" href="download.html" /> - <item name="Wiki" href="wiki/" /> + <item name="Wiki" href="wiki/home.html" /> </links> <menu name="Documentation"> - <item name="JavaDoc" href="http://teetime.sourceforge.net/stabledocs/index.html" /> + <item name="JavaDoc" + href="http://teetime.sourceforge.net/stabledocs/index.html" /> <item name="Release Notes" href="changes-report.html" /> <item name="Project Dependencies" href="dependencies.html" /> <item name="License" href="license.html" /> </menu> <menu name="Get Involved"> <item name="Contribute" href="contribute.html" /> - <item name="SourceForge Project Site" href="https://sourceforge.net/projects/teetime/" /> + <item name="Visit us on GitHub" href="https://github.com/ChristianWulf/teetime/" /> <item name="Issue Tracking" href="https://build.se.informatik.uni-kiel.de/gitlab/chw/teetime/issues" /> <item name="Team" href="team-list.html" /> @@ -70,7 +70,7 @@ <item name="Home" href="index.html" /> <item name="News" href="news.html" /> <item name="Download" href="download.html" /> - <item name="Wiki" href="wiki/" /> + <item name="Wiki" href="wiki/home.html" /> </menu> <!-- <menu ref="modules"/> --> <!-- <menu ref="reports"/> --> @@ -137,7 +137,7 @@ </release> <download> <toc>top</toc> - <tocTopMax>4</tocTopMax> + <tocTopMax>5</tocTopMax> </download> <contribute> <highlightJs>false</highlightJs> diff --git a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java index a214fadc050b675770eaa5d6f8d614df44ad7a98..af9e9c3fec67d309dfdafa9e9be0ae0aaed3968c 100644 --- a/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java +++ b/src/test/java/teetime/examples/wordcounter/WordCounterConfiguration.java @@ -83,7 +83,7 @@ public class WordCounterConfiguration extends Configuration { connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1000); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(threadableStage.getInputPort().getOwningStage()); + threadableStage.getInputPort().getOwningStage().declareActive(); distributorPorts.add(threadableStage.getInputPort()); mergerPorts.add(wc.getOutputPort()); @@ -95,8 +95,8 @@ public class WordCounterConfiguration extends Configuration { connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - addThreadableStage(init); - addThreadableStage(merger); + init.declareActive(); + merger.declareActive(); } public MonitoringThread getMonitoringThread() { diff --git a/src/test/java/teetime/framework/AbstractCompositeStageTest.java b/src/test/java/teetime/framework/AbstractCompositeStageTest.java index 18784f4b3686ec4251779bdf86271d6d6c8a87dd..8cd72834092fb08b5362b2c1e9e2fbf4ee727551 100644 --- a/src/test/java/teetime/framework/AbstractCompositeStageTest.java +++ b/src/test/java/teetime/framework/AbstractCompositeStageTest.java @@ -27,19 +27,19 @@ public class AbstractCompositeStageTest { @Ignore @Test public void testNestedStages() { - Execution<NestesConfig> exec = new Execution<NestesConfig>(new NestesConfig()); + Execution<NestedConf> exec = new Execution<NestedConf>(new NestedConf()); // assertThat(exec.getConfiguration().getContext().getThreadableStages().size(), is(3)); } - private class NestesConfig extends Configuration { + private class NestedConf extends Configuration { private final InitialElementProducer<Object> init; - private final Sink sink; + private final Sink<Object> sink; private final TestNestingCompositeStage compositeStage; - public NestesConfig() { + public NestedConf() { init = new InitialElementProducer<Object>(new Object()); - sink = new Sink(); + sink = new Sink<Object>(); compositeStage = new TestNestingCompositeStage(); connectPorts(init.getOutputPort(), compositeStage.firstCompositeStage.firstCounter.getInputPort()); connectPorts(compositeStage.secondCompositeStage.secondCounter.getOutputPort(), sink.getInputPort()); @@ -52,7 +52,7 @@ public class AbstractCompositeStageTest { private final Counter firstCounter = new Counter(); public TestCompositeOneStage() { - addThreadableStage(firstCounter); + firstCounter.declareActive(); } } @@ -63,7 +63,7 @@ public class AbstractCompositeStageTest { private final Counter secondCounter = new Counter(); public TestCompositeTwoStage() { - addThreadableStage(firstCounter); + firstCounter.declareActive(); connectPorts(firstCounter.getOutputPort(), secondCounter.getInputPort()); } diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index c256f27b29d92b49ad2087a6a64b76ee88847284..617ab6a11a0fb4382261022ad6017fd68aad1d51 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -118,7 +118,7 @@ public class ExecutionTest { public AnalysisTestConfig(final boolean inter) { connectPorts(init.getOutputPort(), sink.getInputPort()); if (inter) { - addThreadableStage(sink); + sink.declareActive(); } } } @@ -143,7 +143,7 @@ public class ExecutionTest { connectPorts(init.getOutputPort(), iof.getInputPort()); connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); connectPorts(init.createOutputPort(), sink.createInputPort()); - addThreadableStage(iof); + iof.declareActive(); } } @@ -191,7 +191,7 @@ public class ExecutionTest { stageWithNamedThread = new InitialElementProducer<Object>(new Object()); Sink<Object> sink = new Sink<Object>(); - addThreadableStage(stageWithNamedThread, "TestName"); + stageWithNamedThread.declareActive("TestName"); connectPorts(stageWithNamedThread.getOutputPort(), sink.getInputPort()); } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 3f61179279e809343854715e2419fc23e7e9bb16..7978645126e233b2443590fac2513643f1013457 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -18,6 +18,7 @@ package teetime.framework; import java.util.ArrayList; import java.util.List; +import teetime.framework.pipe.IPipe; import teetime.framework.pipe.SpScPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -30,14 +31,15 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { public RunnableConsumerStageTestConfiguration(final Integer... inputElements) { InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements); if (inputElements.length > 0) { - addThreadableStage(producer); + producer.declareActive(); } CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); - addThreadableStage(collectorSink); + collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; } diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 937a3b2fd0d6c944354ca426a5e6371f3478c0bb..3081b34678e7e3d81ce154d6ec65b9335f61d708 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -18,7 +18,9 @@ package teetime.framework; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.junit.Assert; import org.junit.Test; @@ -53,6 +55,13 @@ public class StageTest { assertEquals(tc.init.exceptionListener, tc.delay.exceptionListener); } + @Test + public void testActiveFlag() { + TestConfig config = new TestConfig(); + assertTrue(config.init.isActive()); + assertFalse(config.delay.isActive()); + } + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public InitialElementProducer<String> init; diff --git a/src/test/java/teetime/framework/TerminationTest.java b/src/test/java/teetime/framework/TerminationTest.java index ac8767608de4dfc61647426a8b7579929dc7fd70..f08b772f4e252518e3ff3772b910c249559fa0f2 100644 --- a/src/test/java/teetime/framework/TerminationTest.java +++ b/src/test/java/teetime/framework/TerminationTest.java @@ -55,11 +55,11 @@ public class TerminationTest { connectPorts(init.getOutputPort(), firstProp.getInputPort()); connectPorts(firstProp.getOutputPort(), sinkStage.getInputPort(), capacity); connectPorts(sinkStage.getOutputPort(), finalProp.getInputPort()); - addThreadableStage(sinkStage); + sinkStage.declareActive(); } else { Sink<Integer> sink = new Sink<Integer>(); connectPorts(init.getOutputPort(), sink.getInputPort(), capacity); - addThreadableStage(sink); + sink.declareActive(); } } diff --git a/src/test/java/teetime/framework/TraverserTest.java b/src/test/java/teetime/framework/TraverserTest.java index 27878602463535c0cd8a37df0254abc9274fac68..b4f3c0de2e55ca314a1ab88ddf10e5a7c4610d0b 100644 --- a/src/test/java/teetime/framework/TraverserTest.java +++ b/src/test/java/teetime/framework/TraverserTest.java @@ -86,14 +86,14 @@ public class TraverserTest { connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); connectPorts(wc.getOutputPort(), merger.getNewInputPort()); // Add WordCounter as a threadable stage, so it runs in its own thread - addThreadableStage(wc.getInputPort().getOwningStage()); + wc.getInputPort().getOwningStage().declareActive(); } // Connect the stages of the last part connectPorts(merger.getOutputPort(), result.getInputPort()); // Add the first and last part to the threadable stages - addThreadableStage(merger); + merger.declareActive(); } } diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 02730ddeb2d69a5f8156d200fb53196581ab7d1e..507dc935b7fa48f71e161634a9cf943e1a8e6b81 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -29,13 +29,13 @@ class WaitStrategyConfiguration extends Configuration { public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { Stage producer = buildProducer(elements); - addThreadableStage(producer); + producer.declareActive(); Stage consumer = buildConsumer(delay); - addThreadableStage(consumer); + consumer.declareActive(); Clock clock = buildClock(initialDelayInMs, delay); - addThreadableStage(clock); + clock.declareActive(); } private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index 46af8c04b89bc306b66063f6186235f10ab9b0b8..a44f45bf89d7b818bbef0c59bb4482a3ca471c49 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -26,10 +26,10 @@ class YieldStrategyConfiguration extends Configuration { public YieldStrategyConfiguration(final Object... elements) { InitialElementProducer<Object> producer = buildProducer(elements); - addThreadableStage(producer); + producer.declareActive(); Stage consumer = buildConsumer(producer); - addThreadableStage(consumer); + consumer.declareActive(); } private InitialElementProducer<Object> buildProducer(final Object... elements) { diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index 9229eb758a8a5cb16a7ed0618e84e6e62ea10841..79f7ff1dd23f5ed9006ed4bd1d78f54e0b946c06 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -33,8 +33,8 @@ public class ExceptionTestConfiguration extends Configuration { connectPorts(first.getOutputPort(), second.getInputPort()); // this.addThreadableStage(new ExceptionTestStage()); - this.addThreadableStage(second); - this.addThreadableStage(third); + second.declareActive(); + third.declareActive(); } } diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index 8389a8ba92d73c33f8349e0d51ba46b3a71c8ec7..08d48a227d36e76194218714a6a0c94b64fb691f 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import teetime.framework.Configuration; import teetime.framework.Execution; -import teetime.framework.ExecutionException; /** * @author Nils Christian Ehmke @@ -109,15 +108,10 @@ public class InstanceOfFilterTest { } @Test - public void filterShouldSendToBothOutputPorts() throws Exception { + public void filterShouldSendToBothOutputPorts() { InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig(); Execution<InstanceOfFilterTestConfig> execution = new Execution<InstanceOfFilterTestConfig>(config); - try { - execution.executeBlocking(); - } catch (ExecutionException e) { - // Collection<ThreadThrowableContainer> thrownExceptions = e.getThrownExceptions(); - // TODO: handle exception - } + execution.executeBlocking(); } private static class InstanceOfFilterTestConfig extends Configuration { diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java index 26a61ce6603a9e6fc2eb0ec631a81c172815b8b8..f8df4b0751389d020d4eb695adac3c5853f7f874 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/DynamicDistributorTest.java @@ -137,8 +137,8 @@ public class DynamicDistributorTest { connectPorts(initialElementProducer.getOutputPort(), distributor.getInputPort()); connectPorts(distributor.getNewOutputPort(), collectorSink.getInputPort()); - addThreadableStage(distributor); - addThreadableStage(collectorSink); + distributor.declareActive(); + collectorSink.declareActive(); for (PortAction<DynamicDistributor<T>> a : inputActions) { distributor.addPortActionRequest(a); diff --git a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java index 5d8cb94894df1dd0857f778b3a2e32e93c18187c..fdff183633c7404cd6f21fbca3a17986d9ad68e2 100644 --- a/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/dynamic/DynamicMergerTest.java @@ -118,7 +118,7 @@ public class DynamicMergerTest { connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort()); connectPorts(merger.getOutputPort(), collectorSink.getInputPort()); - addThreadableStage(merger); + merger.declareActive(); for (PortAction<DynamicMerger<T>> a : inputActions) { boolean added = merger.addPortActionRequest(a);