Skip to content
Snippets Groups Projects
Commit 4e8f8745 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

first draft... needs further testing

parent 6cd843df
No related branches found
No related tags found
No related merge requests found
......@@ -23,9 +23,7 @@ import java.util.List;
import java.util.Set;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
......@@ -41,8 +39,7 @@ import teetime.framework.validation.InvalidPortConnection;
@Deprecated
public abstract class AbstractCompositeStage extends Stage {
private static final IPipeFactory INTRA_PIPE_FACTORY = PipeFactoryRegistry.INSTANCE
.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
private static final IPipeFactory INTRA_PIPE_FACTORY = new SingleElementPipeFactory();
private final Set<Stage> containingStages = new HashSet<Stage>();
private final Set<Stage> lastStages = new HashSet<Stage>();
......
......@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException;
......@@ -61,6 +64,9 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private boolean initialized;
private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
/**
* Creates a new {@link Analysis} that skips validating the port connections and uses the default listener.
*
......@@ -121,6 +127,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
initialized = true;
instantiatePipes();
final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
if (threadableStageJobs.isEmpty()) {
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
......@@ -163,6 +171,17 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
private void instantiatePipes() {
List<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) {
if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) {
interThreadPipeFactory.create(connection.getFirst(), connection.getSecond());
} else {
intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond());
}
}
}
private Thread createThread(final AbstractRunnableStage runnable, final String name) {
final Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(this);
......
......@@ -43,12 +43,12 @@ public class TokenizerConfiguration extends AnalysisConfiguration {
final Tokenizer tokenizer = new Tokenizer(" ");
this.counter = new Counter<String>();
connectIntraThreads(init.getOutputPort(), f2b.getInputPort());
connectIntraThreads(f2b.getOutputPort(), decomp.getInputPort());
connectIntraThreads(decomp.getOutputPort(), decrypt.getInputPort());
connectIntraThreads(decrypt.getOutputPort(), b2s.getInputPort());
connectIntraThreads(b2s.getOutputPort(), tokenizer.getInputPort());
connectIntraThreads(tokenizer.getOutputPort(), this.counter.getInputPort());
connectStages(init.getOutputPort(), f2b.getInputPort());
connectStages(f2b.getOutputPort(), decomp.getInputPort());
connectStages(decomp.getOutputPort(), decrypt.getInputPort());
connectStages(decrypt.getOutputPort(), b2s.getInputPort());
connectStages(b2s.getOutputPort(), tokenizer.getInputPort());
connectStages(tokenizer.getOutputPort(), this.counter.getInputPort());
this.addThreadableStage(init);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment