Skip to content
Snippets Groups Projects
Commit d2004b4a authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

new method on how to connect pipes #33

parent 0eef8d91
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.AbstractExceptionListener; ...@@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory; import teetime.framework.pipe.UnboundedSpScPipeFactory;
...@@ -200,38 +201,44 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -200,38 +201,44 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
colors.put(threadableStage, i); // Markiere den threadHead colors.put(threadableStage, i); // Markiere den threadHead
colorAndConnectStages(i, colors, threadableStage); colorAndConnectStages(i, colors, threadableStage);
} }
if (configuration.getConnections().size() != createdConnections) { // if (configuration.getConnections().size() != createdConnections) {
throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); // throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)");
} // }
} }
public void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) { public void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) {
Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (Connection connection : configuration.getConnections()) { for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (connection.getSourcePort().getOwningStage() == threadableStage) { if (outputPort.pipe != null) {
Stage targetStage = connection.getTargetPort().getOwningStage(); InstantiationPipe pipe;
Integer targetColor = new Integer(0); if (outputPort.pipe instanceof InstantiationPipe) {
if (colors.containsKey(targetStage)) { pipe = (InstantiationPipe) outputPort.pipe;
targetColor = colors.get(targetStage); Connection connection = new Connection(outputPort, pipe.getTargetPort(), pipe.getCapacity());
} Stage targetStage = pipe.getTargetPort().getOwningStage();
if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { Integer targetColor = new Integer(0);
if (connection.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4);
}
} else {
if (colors.containsKey(targetStage)) { if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(i)) { targetColor = colors.get(targetStage);
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") }
if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) {
if (pipe.getCapacity() != 0) {
interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity());
} else {
interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4);
}
} else {
if (colors.containsKey(targetStage)) {
if (!colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
} }
intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort());
colors.put(targetStage, i);
colorAndConnectStages(i, colors, targetStage);
} }
intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); createdConnections++;
colors.put(targetStage, i);
colorAndConnectStages(i, colors, targetStage);
} }
createdConnections++;
} }
} }
} }
......
...@@ -20,6 +20,7 @@ import java.util.Set; ...@@ -20,6 +20,7 @@ import java.util.Set;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
...@@ -178,7 +179,8 @@ public abstract class AnalysisConfiguration { ...@@ -178,7 +179,8 @@ public abstract class AnalysisConfiguration {
* the pipe is set to this capacity, if the value is greater than 0. If it is 0, than the pipe is unbounded, thus growing of the pipe is enabled. * the pipe is set to this capacity, if the value is greater than 0. If it is 0, than the pipe is unbounded, thus growing of the pipe is enabled.
*/ */
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
connections.add(new Connection<T>(sourcePort, targetPort, capacity)); new InstantiationPipe<T>(sourcePort, targetPort, capacity);
// connections.add(new Connection<T>(sourcePort, targetPort, capacity));
} }
/** /**
......
package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public class InstantiationPipe<T> extends AbstractIntraThreadPipe {
private final InputPort<T> target;
private final int capacity;
public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.target = targetPort;
this.capacity = capacity;
sourcePort.setPipe(this);
}
public int getCapacity() {
return capacity;
}
public InputPort<T> getTarget() {
return target;
}
@Override
public boolean add(final Object element) {
throw new IllegalStateException("Should not be called");
}
@Override
public boolean isEmpty() {
throw new IllegalStateException("Should not be called");
}
@Override
public int size() {
throw new IllegalStateException("Should not be called");
}
@Override
public Object removeLast() {
throw new IllegalStateException("Should not be called");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment