diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index e6105adbc0861bbbe9113e758d7e9d2aa025e3fb..abcf959939d7e605dc2e1cf72745e1900b53b448 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -19,9 +19,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.SingleElementPipeFactory; @@ -30,77 +27,86 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory; class ExecutionInstantiation { - private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class); private static final int DEFAULT_COLOR = 0; + private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); + private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); + private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - - private final ConfigurationContext configuration; + private final ConfigurationContext context; public ExecutionInstantiation(final ConfigurationContext configuration) { - this.configuration = configuration; + this.context = configuration; + } + + void instantiatePipes() { + int color = DEFAULT_COLOR; + Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); + Set<Stage> threadableStageJobs = context.getThreadableStages().keySet(); + for (Stage threadableStage : threadableStageJobs) { + color++; + colors.put(threadableStage, color); + + ThreadPainter threadPainter = new ThreadPainter(colors, color, context); + threadPainter.colorAndConnectStages(threadableStage); + } } - @SuppressWarnings({ "rawtypes" }) - int colorAndConnectStages(final int color, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { - Set<Stage> threadableStages = configuration.getThreadableStages().keySet(); + private static class ThreadPainter { + + private final Map<Stage, Integer> colors; + private final int color; + private final ConfigurationContext context; + + public ThreadPainter(final Map<Stage, Integer> colors, final int color, final ConfigurationContext context) { + super(); + this.colors = colors; + this.color = color; + this.context = context; + } + + public int colorAndConnectStages(final Stage stage) { + int createdConnections = 0; - int createdConnections = 0; - for (OutputPort outputPort : threadableStage.getOutputPorts()) { - if (outputPort.pipe != null) { - if (outputPort.pipe instanceof InstantiationPipe) { + for (OutputPort<?> outputPort : stage.getOutputPorts()) { + if (outputPort.pipe != null && outputPort.pipe instanceof InstantiationPipe) { InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; - createdConnections += processPipe(color, colors, configuration, threadableStages, outputPort, pipe); + createdConnections += processPipe(outputPort, pipe); createdConnections++; } } + return createdConnections; } - return createdConnections; - } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private int processPipe(final int color, final Map<Stage, Integer> colors, final ConfigurationContext configuration, final Set<Stage> threadableStages, - final OutputPort outputPort, final InstantiationPipe pipe) { - Stage targetStage = pipe.getTargetPort().getOwningStage(); + @SuppressWarnings({ "rawtypes", "unchecked" }) + private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) { + Set<Stage> threadableStages = context.getThreadableStages().keySet(); + int numCreatedConnections; - int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; + Stage targetStage = pipe.getTargetPort().getOwningStage(); + int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; - if (threadableStages.contains(targetStage) && targetColor != color) { - if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); + if (threadableStages.contains(targetStage) && targetColor != color) { + if (pipe.capacity() != 0) { + interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); + } else { + interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + } + numCreatedConnections = 0; } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); - } - } else { - if (colors.containsKey(targetStage)) { - if (!colors.get(targetStage).equals(color)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(color)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } } + intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + colors.put(targetStage, color); + numCreatedConnections = colorAndConnectStages(targetStage); } - intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); - colors.put(targetStage, color); - return colorAndConnectStages(color, colors, targetStage, configuration); - } - return 0; - } - void instantiatePipes() { - int color = DEFAULT_COLOR; - Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); - Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); - int numCreatedConnections = 0; - for (Stage threadableStage : threadableStageJobs) { - color++; - colors.put(threadableStage, color); - numCreatedConnections += colorAndConnectStages(color, colors, threadableStage, configuration); + return numCreatedConnections; } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Created " + numCreatedConnections + " connections"); - } } }