diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 7b0b5f40d46186e96941401eaf40166aabf1f6fe..6ebe6684d00a1db4c88924806f99cf0f5839cfc9 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -16,13 +16,16 @@ package teetime.framework; /** - * Represents a configuration of connected stages. + * Represents a configuration of connected stages. Available to be extended. * * @author Christian Wulf, Nelson Tavares de Sousa * * @since 2.0 * */ -public abstract class Configuration extends AbstractCompositeStage { +public class Configuration extends AbstractCompositeStage { + protected Configuration() { + // protected ctor to prevent direct instantiation. + } } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 4ecf1b2097cb1173d6af22fa49c92c571a8ec084..cce003efbed341c2de6266cbdaf4a83955fb7518 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -31,6 +31,7 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory; class ExecutionInstantiation { private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class); + private static final int DEFAULT_COLOR = 0; private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); @@ -42,35 +43,16 @@ class ExecutionInstantiation { this.configuration = configuration; } - @SuppressWarnings("rawtypes") - Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { - Integer createdConnections = new Integer(0); - Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); + @SuppressWarnings({ "rawtypes" }) + int colorAndConnectStages(final int color, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { + Set<Stage> threadableStages = configuration.getThreadableStages().keySet(); + + int createdConnections = 0; for (OutputPort outputPort : threadableStage.getOutputPorts()) { if (outputPort.pipe != null) { if (outputPort.pipe instanceof InstantiationPipe) { InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; - Stage targetStage = pipe.getTargetPort().getOwningStage(); - Integer targetColor = new Integer(0); - if (colors.containsKey(targetStage)) { - targetColor = colors.get(targetStage); - } - if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { - if (pipe.getCapacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); - } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); - } - } else { - if (colors.containsKey(targetStage)) { - if (!colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") - } - } - intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); - colors.put(targetStage, i); - createdConnections += colorAndConnectStages(i, colors, targetStage, configuration); - } + createdConnections += processPipe(color, colors, configuration, threadableStages, outputPort, pipe); createdConnections++; } } @@ -79,15 +61,44 @@ class ExecutionInstantiation { return createdConnections; } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private int processPipe(final int color, final Map<Stage, Integer> colors, final ConfigurationContext configuration, final Set<Stage> threadableStages, + final OutputPort outputPort, final InstantiationPipe pipe) { + Stage targetStage = pipe.getTargetPort().getOwningStage(); + + int targetColor = DEFAULT_COLOR; + if (colors.containsKey(targetStage)) { + targetColor = colors.get(targetStage); + } + + if (threadableStages.contains(targetStage) && targetColor != color) { + if (pipe.getCapacity() != 0) { + interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); + } else { + interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + } + } else { + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(color)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } + } + intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + colors.put(targetStage, color); + return colorAndConnectStages(color, colors, targetStage, configuration); + } + return 0; + } + void instantiatePipes() { - Integer i = new Integer(0); + int color = DEFAULT_COLOR; Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); Integer createdConnections = 0; for (Stage threadableStage : threadableStageJobs) { - i++; - colors.put(threadableStage, i); - createdConnections = colorAndConnectStages(i, colors, threadableStage, configuration); + color++; + colors.put(threadableStage, color); + createdConnections = colorAndConnectStages(color, colors, threadableStage, configuration); } LOGGER.debug("Created " + createdConnections + " connections"); }