diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 1167a87201072af56b9d7063970fcb00862fa35f..e1266907d20690e39e7ef9fd2c8a808ce314e09f 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -23,9 +23,7 @@ import java.util.List; import java.util.Set; import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -41,8 +39,7 @@ import teetime.framework.validation.InvalidPortConnection; @Deprecated public abstract class AbstractCompositeStage extends Stage { - private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE - .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + private static final IPipeFactory INTRA_PIPE_FACTORY = new SingleElementPipeFactory(); private final Set<Stage> containingStages = new HashSet<Stage>(); private final Set<Stage> lastStages = new HashSet<Stage>(); diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 78375efac944c52a8983dac250f9ea3dd524154c..6dc3e14cabc13e4623aac2e9c1f21fecf27b7cc6 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -61,6 +64,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private boolean initialized; + private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. * @@ -121,6 +127,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } initialized = true; + instantiatePipes(); + final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); @@ -163,6 +171,17 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } + private void instantiatePipes() { + List<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); + for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) { + if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) { + interThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + } else { + intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + } + } + } + private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index e6b0f9d7e4e26f34295dd3ebe7a09e7f11020927..32bec67fd86676965729ee340e4bcf1a2c5b9c34 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -43,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration { final Tokenizer tokenizer = new Tokenizer(" "); this.counter = new Counter<String>(); - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort()); - connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); - connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort()); - connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort()); - connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort()); + connectStages(init.getOutputPort(), f2b.getInputPort()); + connectStages(f2b.getOutputPort(), decomp.getInputPort()); + connectStages(decomp.getOutputPort(), decrypt.getInputPort()); + connectStages(decrypt.getOutputPort(), b2s.getInputPort()); + connectStages(b2s.getOutputPort(), tokenizer.getInputPort()); + connectStages(tokenizer.getOutputPort(), this.counter.getInputPort()); this.addThreadableStage(init); }