Skip to content
Snippets Groups Projects
Commit 6af22b38 authored by Christian Wulf's avatar Christian Wulf
Browse files

refactored ExecutionInstantiation

parent d9fbc75b
No related branches found
No related tags found
No related merge requests found
...@@ -19,9 +19,6 @@ import java.util.HashMap; ...@@ -19,9 +19,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SingleElementPipeFactory;
...@@ -30,77 +27,86 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory; ...@@ -30,77 +27,86 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory;
class ExecutionInstantiation { class ExecutionInstantiation {
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class);
private static final int DEFAULT_COLOR = 0; private static final int DEFAULT_COLOR = 0;
private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final ConfigurationContext context;
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private final ConfigurationContext configuration;
public ExecutionInstantiation(final ConfigurationContext configuration) { public ExecutionInstantiation(final ConfigurationContext configuration) {
this.configuration = configuration; this.context = configuration;
}
void instantiatePipes() {
int color = DEFAULT_COLOR;
Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = context.getThreadableStages().keySet();
for (Stage threadableStage : threadableStageJobs) {
color++;
colors.put(threadableStage, color);
ThreadPainter threadPainter = new ThreadPainter(colors, color, context);
threadPainter.colorAndConnectStages(threadableStage);
}
} }
@SuppressWarnings({ "rawtypes" }) private static class ThreadPainter {
int colorAndConnectStages(final int color, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) {
Set<Stage> threadableStages = configuration.getThreadableStages().keySet(); private final Map<Stage, Integer> colors;
private final int color;
private final ConfigurationContext context;
public ThreadPainter(final Map<Stage, Integer> colors, final int color, final ConfigurationContext context) {
super();
this.colors = colors;
this.color = color;
this.context = context;
}
public int colorAndConnectStages(final Stage stage) {
int createdConnections = 0;
int createdConnections = 0; for (OutputPort<?> outputPort : stage.getOutputPorts()) {
for (OutputPort outputPort : threadableStage.getOutputPorts()) { if (outputPort.pipe != null && outputPort.pipe instanceof InstantiationPipe) {
if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) {
InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe;
createdConnections += processPipe(color, colors, configuration, threadableStages, outputPort, pipe); createdConnections += processPipe(outputPort, pipe);
createdConnections++; createdConnections++;
} }
} }
return createdConnections;
} }
return createdConnections;
}
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "rawtypes", "unchecked" })
private int processPipe(final int color, final Map<Stage, Integer> colors, final ConfigurationContext configuration, final Set<Stage> threadableStages, private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) {
final OutputPort outputPort, final InstantiationPipe pipe) { Set<Stage> threadableStages = context.getThreadableStages().keySet();
Stage targetStage = pipe.getTargetPort().getOwningStage(); int numCreatedConnections;
int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; Stage targetStage = pipe.getTargetPort().getOwningStage();
int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR;
if (threadableStages.contains(targetStage) && targetColor != color) { if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.capacity() != 0) { if (pipe.capacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
}
numCreatedConnections = 0;
} else { } else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); if (colors.containsKey(targetStage)) {
} if (!colors.get(targetStage).equals(color)) {
} else { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
if (colors.containsKey(targetStage)) { }
if (!colors.get(targetStage).equals(color)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
} }
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
colors.put(targetStage, color);
numCreatedConnections = colorAndConnectStages(targetStage);
} }
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
colors.put(targetStage, color);
return colorAndConnectStages(color, colors, targetStage, configuration);
}
return 0;
}
void instantiatePipes() { return numCreatedConnections;
int color = DEFAULT_COLOR;
Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
int numCreatedConnections = 0;
for (Stage threadableStage : threadableStageJobs) {
color++;
colors.put(threadableStage, color);
numCreatedConnections += colorAndConnectStages(color, colors, threadableStage, configuration);
} }
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Created " + numCreatedConnections + " connections");
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment