diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 7ac043e62eba5521248b78985fe353e4583ea692..7457d16e11820827506ed23ee5d0eb3de9741aac 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -210,9 +210,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught colorAndConnectStages(i, colors, threadableStage); } if (configuration.getConnections().size() != createdConnections) { - for (Connection<?> conn : configuration.getConnections()) { - System.out.println(conn.getSourcePort().getOwningStage().getId() + " connects with " + conn.getTargetPort().getOwningStage().getId()); - } throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); } } @@ -235,7 +232,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } } else { if (colors.containsKey(targetStage)) { - if (colors.get(targetStage).equals(i)) { + if (!colors.get(targetStage).equals(i)) { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 56004767295c9f576846129ae23ac312779b2ee9..26139ce63fb552b01dd65c886c314ad3a4eaedd7 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -86,7 +86,7 @@ public abstract class AnalysisConfiguration { * @return * the pipe instance which connects the two given stages * - * @deprecated since 1.2 + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ @Deprecated protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { @@ -101,7 +101,7 @@ public abstract class AnalysisConfiguration { * @return * the pipe instance which connects the two given stages * - * @deprecated since 1.2 + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ @Deprecated protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { @@ -116,7 +116,7 @@ public abstract class AnalysisConfiguration { * @return * the pipe instance which connects the two given stages * - * @deprecated since 1.2 + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ @Deprecated protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { @@ -132,7 +132,7 @@ public abstract class AnalysisConfiguration { * capacity of the underlying queue * @return * - * @deprecated since 1.2 + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ @Deprecated protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { @@ -148,7 +148,7 @@ public abstract class AnalysisConfiguration { * capacity of the underlying queue * @return * - * @deprecated since 1.2 + * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead. */ @Deprecated protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { diff --git a/src/test/java/teetime/framework/AnalysisTest.java b/src/test/java/teetime/framework/AnalysisTest.java index 5561f8094ff93ecdea16f784d28152d60bb14e04..5c347dfe763eaa8627db1930e973dcf26eac1594 100644 --- a/src/test/java/teetime/framework/AnalysisTest.java +++ b/src/test/java/teetime/framework/AnalysisTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import teetime.stage.InitialElementProducer; +import teetime.stage.InstanceOfFilter; import teetime.stage.basic.Sink; import teetime.util.StopWatch; @@ -131,6 +132,22 @@ public class AnalysisTest { public void testInstantiatePipesIncorrectConfiguration() { thrown.expect(IllegalStateException.class); thrown.expectMessage("Crossing threads"); + InvalidTestConfig configuration = new InvalidTestConfig(); + new Analysis<InvalidTestConfig>(configuration); + } + + private class InvalidTestConfig extends AnalysisConfiguration { + public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); + public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); + public Sink<Object> sink = new Sink<Object>(); + + public InvalidTestConfig() { + connectPorts(init.getOutputPort(), iof.getInputPort()); + connectPorts(iof.getMatchedOutputPort(), sink.getInputPort()); + connectPorts(init.createOutputPort(), sink.createInputPort()); + addThreadableStage(init); + addThreadableStage(iof); + } } }