Skip to content
Snippets Groups Projects
Commit 1a8c6038 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

introduced method to set the pipe's capacity

parent d3afffc0
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException;
......@@ -66,7 +67,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private boolean initialized;
private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
/**
......@@ -175,11 +177,15 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private void instantiatePipes() {
List<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) {
if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) {
interThreadPipeFactory.create(connection.getFirst(), connection.getSecond());
for (Pair<Pair<OutputPort, InputPort>, Integer> connection : configuration.getConnections()) {
if (threadableStageJobs.contains(connection.getFirst().getSecond().getOwningStage())) {
if (connection.getSecond() != 0) {
interBoundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond());
} else {
interUnboundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond());
}
} else {
intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond());
intraThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond());
}
}
}
......
......@@ -32,7 +32,7 @@ import teetime.util.Pair;
public abstract class AnalysisConfiguration {
private final List<Stage> threadableStageJobs = new LinkedList<Stage>();
private final List<Pair<OutputPort, InputPort>> connections = new LinkedList<Pair<OutputPort, InputPort>>();
private final List<Pair<Pair<OutputPort, InputPort>, Integer>> connections = new LinkedList<Pair<Pair<OutputPort, InputPort>, Integer>>();
@SuppressWarnings("deprecation")
private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
......@@ -146,7 +146,12 @@ public abstract class AnalysisConfiguration {
}
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
connections.add(new Pair<OutputPort, InputPort>(sourcePort, targetPort));
connectPorts(sourcePort, targetPort, 4);
}
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
connections.add(new Pair(new Pair<OutputPort, InputPort>(sourcePort, targetPort), capacity));
}
/**
......@@ -154,7 +159,7 @@ public abstract class AnalysisConfiguration {
*
* @return a list of pairs of Out- and InputPorts, which are connected
*/
protected List<Pair<OutputPort, InputPort>> getConnections() {
protected List<Pair<Pair<OutputPort, InputPort>, Integer>> getConnections() {
return connections;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment