Skip to content
Snippets Groups Projects
Commit 836da0ea authored by Christian Wulf's avatar Christian Wulf
Browse files

minor refactorings

parent 502093cd
No related branches found
No related tags found
No related merge requests found
...@@ -16,13 +16,16 @@ ...@@ -16,13 +16,16 @@
package teetime.framework; package teetime.framework;
/** /**
* Represents a configuration of connected stages. * Represents a configuration of connected stages. Available to be extended.
* *
* @author Christian Wulf, Nelson Tavares de Sousa * @author Christian Wulf, Nelson Tavares de Sousa
* *
* @since 2.0 * @since 2.0
* *
*/ */
public abstract class Configuration extends AbstractCompositeStage { public class Configuration extends AbstractCompositeStage {
protected Configuration() {
// protected ctor to prevent direct instantiation.
}
} }
...@@ -31,6 +31,7 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory; ...@@ -31,6 +31,7 @@ import teetime.framework.pipe.UnboundedSpScPipeFactory;
class ExecutionInstantiation { class ExecutionInstantiation {
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class); private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionInstantiation.class);
private static final int DEFAULT_COLOR = 0;
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
...@@ -42,35 +43,16 @@ class ExecutionInstantiation { ...@@ -42,35 +43,16 @@ class ExecutionInstantiation {
this.configuration = configuration; this.configuration = configuration;
} }
@SuppressWarnings("rawtypes") @SuppressWarnings({ "rawtypes" })
Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { int colorAndConnectStages(final int color, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) {
Integer createdConnections = new Integer(0); Set<Stage> threadableStages = configuration.getThreadableStages().keySet();
Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
int createdConnections = 0;
for (OutputPort outputPort : threadableStage.getOutputPorts()) { for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (outputPort.pipe != null) { if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) { if (outputPort.pipe instanceof InstantiationPipe) {
InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe; InstantiationPipe pipe = (InstantiationPipe) outputPort.pipe;
Stage targetStage = pipe.getTargetPort().getOwningStage(); createdConnections += processPipe(color, colors, configuration, threadableStages, outputPort, pipe);
Integer targetColor = new Integer(0);
if (colors.containsKey(targetStage)) {
targetColor = colors.get(targetStage);
}
if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) {
if (pipe.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
}
} else {
if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
}
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
colors.put(targetStage, i);
createdConnections += colorAndConnectStages(i, colors, targetStage, configuration);
}
createdConnections++; createdConnections++;
} }
} }
...@@ -79,15 +61,44 @@ class ExecutionInstantiation { ...@@ -79,15 +61,44 @@ class ExecutionInstantiation {
return createdConnections; return createdConnections;
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private int processPipe(final int color, final Map<Stage, Integer> colors, final ConfigurationContext configuration, final Set<Stage> threadableStages,
final OutputPort outputPort, final InstantiationPipe pipe) {
Stage targetStage = pipe.getTargetPort().getOwningStage();
int targetColor = DEFAULT_COLOR;
if (colors.containsKey(targetStage)) {
targetColor = colors.get(targetStage);
}
if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
}
} else {
if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(color)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
}
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
colors.put(targetStage, color);
return colorAndConnectStages(color, colors, targetStage, configuration);
}
return 0;
}
void instantiatePipes() { void instantiatePipes() {
Integer i = new Integer(0); int color = DEFAULT_COLOR;
Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
Integer createdConnections = 0; Integer createdConnections = 0;
for (Stage threadableStage : threadableStageJobs) { for (Stage threadableStage : threadableStageJobs) {
i++; color++;
colors.put(threadableStage, i); colors.put(threadableStage, color);
createdConnections = colorAndConnectStages(i, colors, threadableStage, configuration); createdConnections = colorAndConnectStages(color, colors, threadableStage, configuration);
} }
LOGGER.debug("Created " + createdConnections + " connections"); LOGGER.debug("Created " + createdConnections + " connections");
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment