diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ed595de60d4a551a37086133e79a61a103be5591..5385dab650b1ab89d227c2cd7d0d76b8c5f3d0fd 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -66,7 +67,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private boolean initialized; - private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); /** @@ -175,11 +177,15 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private void instantiatePipes() { List<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) { - if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) { - interThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + for (Pair<Pair<OutputPort, InputPort>, Integer> connection : configuration.getConnections()) { + if (threadableStageJobs.contains(connection.getFirst().getSecond().getOwningStage())) { + if (connection.getSecond() != 0) { + interBoundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond()); + } else { + interUnboundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond()); + } } else { - intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + intraThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond()); } } } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index b8c6bee5e013e55d4aa7c078153098dafcb5e2a1..ac0b85b1862e14affb829299b4d19a8cf9fede9b 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -32,7 +32,7 @@ import teetime.util.Pair; public abstract class AnalysisConfiguration { private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); - private final List<Pair<OutputPort, InputPort>> connections = new LinkedList<Pair<OutputPort, InputPort>>(); + private final List<Pair<Pair<OutputPort, InputPort>, Integer>> connections = new LinkedList<Pair<Pair<OutputPort, InputPort>, Integer>>(); @SuppressWarnings("deprecation") private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; @@ -146,7 +146,12 @@ public abstract class AnalysisConfiguration { } protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - connections.add(new Pair<OutputPort, InputPort>(sourcePort, targetPort)); + connectPorts(sourcePort, targetPort, 4); + } + + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + connections.add(new Pair(new Pair<OutputPort, InputPort>(sourcePort, targetPort), capacity)); + } /** @@ -154,7 +159,7 @@ public abstract class AnalysisConfiguration { * * @return a list of pairs of Out- and InputPorts, which are connected */ - protected List<Pair<OutputPort, InputPort>> getConnections() { + protected List<Pair<Pair<OutputPort, InputPort>, Integer>> getConnections() { return connections; }