diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 50e4f96f2b963cb38de20df715375b45829d04ad..d4343a970ff6a1fc859ba1a84cb082f6b495a932 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -31,6 +31,7 @@ import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.SingleElementPipeFactory; import teetime.framework.pipe.SpScPipeFactory; import teetime.framework.pipe.UnboundedSpScPipeFactory; @@ -200,38 +201,44 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught colors.put(threadableStage, i); // Markiere den threadHead colorAndConnectStages(i, colors, threadableStage); } - if (configuration.getConnections().size() != createdConnections) { - throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); - } + // if (configuration.getConnections().size() != createdConnections) { + // throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); + // } } public void colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage) { Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); - for (Connection connection : configuration.getConnections()) { - if (connection.getSourcePort().getOwningStage() == threadableStage) { - Stage targetStage = connection.getTargetPort().getOwningStage(); - Integer targetColor = new Integer(0); - if (colors.containsKey(targetStage)) { - targetColor = colors.get(targetStage); - } - if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { - if (connection.getCapacity() != 0) { - interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity()); - } else { - interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4); - } - } else { + for (OutputPort outputPort : threadableStage.getOutputPorts()) { + if (outputPort.pipe != null) { + InstantiationPipe pipe; + if (outputPort.pipe instanceof InstantiationPipe) { + pipe = (InstantiationPipe) outputPort.pipe; + Connection connection = new Connection(outputPort, pipe.getTargetPort(), pipe.getCapacity()); + Stage targetStage = pipe.getTargetPort().getOwningStage(); + Integer targetColor = new Integer(0); if (colors.containsKey(targetStage)) { - if (!colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + targetColor = colors.get(targetStage); + } + if (threadableStageJobs.contains(targetStage) && targetColor.compareTo(i) != 0) { + if (pipe.getCapacity() != 0) { + interBoundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), connection.getCapacity()); + } else { + interUnboundedThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort(), 4); + } + } else { + if (colors.containsKey(targetStage)) { + if (!colors.get(targetStage).equals(i)) { + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") + } } + intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); + colors.put(targetStage, i); + colorAndConnectStages(i, colors, targetStage); } - intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); - colors.put(targetStage, i); - colorAndConnectStages(i, colors, targetStage); + createdConnections++; } - createdConnections++; } + } } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 26139ce63fb552b01dd65c886c314ad3a4eaedd7..7b91b58341d622812730dea56f9d178e6df48e13 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -20,6 +20,7 @@ import java.util.Set; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.InstantiationPipe; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; @@ -178,7 +179,8 @@ public abstract class AnalysisConfiguration { * the pipe is set to this capacity, if the value is greater than 0. If it is 0, than the pipe is unbounded, thus growing of the pipe is enabled. */ protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - connections.add(new Connection<T>(sourcePort, targetPort, capacity)); + new InstantiationPipe<T>(sourcePort, targetPort, capacity); + // connections.add(new Connection<T>(sourcePort, targetPort, capacity)); } /** diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..6a93b039ab5d3629326365112d0c103fc0885541 --- /dev/null +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -0,0 +1,47 @@ +package teetime.framework.pipe; + +import teetime.framework.AbstractIntraThreadPipe; +import teetime.framework.InputPort; +import teetime.framework.OutputPort; + +public class InstantiationPipe<T> extends AbstractIntraThreadPipe { + + private final InputPort<T> target; + private final int capacity; + + public InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort); + this.target = targetPort; + this.capacity = capacity; + sourcePort.setPipe(this); + } + + public int getCapacity() { + return capacity; + } + + public InputPort<T> getTarget() { + return target; + } + + @Override + public boolean add(final Object element) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public boolean isEmpty() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public int size() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public Object removeLast() { + throw new IllegalStateException("Should not be called"); + } + +}