From 1a8c60387bd10f85d0e8184e785306bad7751f48 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Tue, 2 Jun 2015 11:37:05 +0200 Subject: [PATCH] introduced method to set the pipe's capacity --- src/main/java/teetime/framework/Analysis.java | 16 +++++++++++----- .../teetime/framework/AnalysisConfiguration.java | 11 ++++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ed595de6..5385dab6 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; @@ -66,7 +67,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private boolean initialized; - private final IPipeFactory interThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); /** @@ -175,11 +177,15 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private void instantiatePipes() { List<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Pair<OutputPort, InputPort> connection : configuration.getConnections()) { - if (threadableStageJobs.contains(connection.getSecond().getOwningStage())) { - interThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + for (Pair<Pair<OutputPort, InputPort>, Integer> connection : configuration.getConnections()) { + if (threadableStageJobs.contains(connection.getFirst().getSecond().getOwningStage())) { + if (connection.getSecond() != 0) { + interBoundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond()); + } else { + interUnboundedThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond(), connection.getSecond()); + } } else { - intraThreadPipeFactory.create(connection.getFirst(), connection.getSecond()); + intraThreadPipeFactory.create(connection.getFirst().getFirst(), connection.getFirst().getSecond()); } } } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index b8c6bee5..ac0b85b1 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -32,7 +32,7 @@ import teetime.util.Pair; public abstract class AnalysisConfiguration { private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); - private final List<Pair<OutputPort, InputPort>> connections = new LinkedList<Pair<OutputPort, InputPort>>(); + private final List<Pair<Pair<OutputPort, InputPort>, Integer>> connections = new LinkedList<Pair<Pair<OutputPort, InputPort>, Integer>>(); @SuppressWarnings("deprecation") private static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; @@ -146,7 +146,12 @@ public abstract class AnalysisConfiguration { } protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - connections.add(new Pair<OutputPort, InputPort>(sourcePort, targetPort)); + connectPorts(sourcePort, targetPort, 4); + } + + protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + connections.add(new Pair(new Pair<OutputPort, InputPort>(sourcePort, targetPort), capacity)); + } /** @@ -154,7 +159,7 @@ public abstract class AnalysisConfiguration { * * @return a list of pairs of Out- and InputPorts, which are connected */ - protected List<Pair<OutputPort, InputPort>> getConnections() { + protected List<Pair<Pair<OutputPort, InputPort>, Integer>> getConnections() { return connections; } -- GitLab