diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 17a7e94a489625a06f45e62650f0de37dfac23e2..7ac043e62eba5521248b78985fe353e4583ea692 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -73,7 +73,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private Integer connected = new Integer(0); + private Integer createdConnections = new Integer(0); /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. @@ -143,7 +143,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } for (Stage stage : threadableStageJobs) { - final Thread thread = initializeStages(stage); + final Thread thread = initializeThreadableStages(stage); final Set<Stage> intraStages = traverseIntraStages(stage); final AbstractExceptionListener newListener = factory.createInstance(); @@ -152,7 +152,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } - private Thread initializeStages(final Stage stage) { + private Thread initializeThreadableStages(final Stage stage) { final Thread thread; final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); @@ -209,8 +209,11 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught colors.put(threadableStage, i); // Markiere den threadHead colorAndConnectStages(i, colors, threadableStage); } - if (configuration.getConnections().size() != connected) { - throw new IllegalStateException("remaining " + (configuration.getConnections().size() - connected) + " connections"); + if (configuration.getConnections().size() != createdConnections) { + for (Connection<?> conn : configuration.getConnections()) { + System.out.println(conn.getSourcePort().getOwningStage().getId() + " connects with " + conn.getTargetPort().getOwningStage().getId()); + } + throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); } } @@ -233,15 +236,14 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } else { if (colors.containsKey(targetStage)) { if (colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (not the "headstage") + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); colors.put(targetStage, i); colorAndConnectStages(i, colors, targetStage); } - connected++; - // configuration.getConnections().remove(connection); remove connection to increase performance + createdConnections++; } } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 7dca2a99fc172c73a12e247aae3cd4693d67d0de..74d0cd73531a604c7170771fb699788ae80cbde0 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -35,7 +35,8 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); - connectPorts(producer.getOutputPort(), collectorSink.getInputPort()); + // Can not use createPorts, as the if condition above will lead to an exception + connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; }