From 4e8f8745a51f487a6063d17c5d06f649a5b20341 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Tue, 26 May 2015 14:03:31 +0200 Subject: [PATCH] first draft... needs further testing --- .../framework/AbstractCompositeStage.java | 7 ++----- src/main/java/teetime/framework/Analysis.java | 19 +++++++++++++++++++ .../tokenizer/TokenizerConfiguration.java | 12 ++++++------ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 1167a872..e1266907 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -23,9 +23,7 @@ import java.util.List; import java.util.Set; import teetime.framework.pipe.IPipeFactory; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; -import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -41,8 +39,7 @@ import teetime.framework.validation.InvalidPortConnection; @Deprecated public abstract class AbstractCompositeStage extends Stage { - private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE - .getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + private static final IPipeFactory INTRA_PIPE_FACTORY = new SingleElementPipeFactory(); private final Set<Stage> containingStages = new HashSet<Stage>(); private final Set<Stage> lastStages = new HashSet<Stage>(); diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 78375efa..6dc3e14c 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.SingleElementPipeFactory; +import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -61,6 +64,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private boolean initialized; + private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); + /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. * @@ -121,6 +127,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } initialized = true; + instantiatePipes(); + final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); @@ -163,6 +171,17 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } + private void instantiatePipes() { + List<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); + for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) { + if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) { + interThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + } else { + intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + } + } + } + private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(this); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index e6b0f9d7..32bec67f 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -43,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration { final Tokenizer tokenizer = new Tokenizer(" "); this.counter = new Counter<String>(); - connectIntraThreads(init.getOutputPort(), f2b.getInputPort()); - connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort()); - connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort()); - connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort()); - connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort()); - connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort()); + connectStages(init.getOutputPort(), f2b.getInputPort()); + connectStages(f2b.getOutputPort(), decomp.getInputPort()); + connectStages(decomp.getOutputPort(), decrypt.getInputPort()); + connectStages(decrypt.getOutputPort(), b2s.getInputPort()); + connectStages(b2s.getOutputPort(), tokenizer.getInputPort()); + connectStages(tokenizer.getOutputPort(), this.counter.getInputPort()); this.addThreadableStage(init); } -- GitLab