Skip to content
Snippets Groups Projects
Commit 31b88866 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

finished test, solved one bug and updated javadoc

parent da76c7fa
No related branches found
No related tags found
No related merge requests found
...@@ -210,9 +210,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -210,9 +210,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
colorAndConnectStages(i, colors, threadableStage); colorAndConnectStages(i, colors, threadableStage);
} }
if (configuration.getConnections().size() != createdConnections) { if (configuration.getConnections().size() != createdConnections) {
for (Connection<?> conn : configuration.getConnections()) {
System.out.println(conn.getSourcePort().getOwningStage().getId() + " connects with " + conn.getTargetPort().getOwningStage().getId());
}
throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)");
} }
} }
...@@ -235,7 +232,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -235,7 +232,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
} else { } else {
if (colors.containsKey(targetStage)) { if (colors.containsKey(targetStage)) {
if (colors.get(targetStage).equals(i)) { if (!colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
} }
} }
......
...@@ -86,7 +86,7 @@ public abstract class AnalysisConfiguration { ...@@ -86,7 +86,7 @@ public abstract class AnalysisConfiguration {
* @return * @return
* the pipe instance which connects the two given stages * the pipe instance which connects the two given stages
* *
* @deprecated since 1.2 * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/ */
@Deprecated @Deprecated
protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected static <T> IPipe connectIntraThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
...@@ -101,7 +101,7 @@ public abstract class AnalysisConfiguration { ...@@ -101,7 +101,7 @@ public abstract class AnalysisConfiguration {
* @return * @return
* the pipe instance which connects the two given stages * the pipe instance which connects the two given stages
* *
* @deprecated since 1.2 * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/ */
@Deprecated @Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
...@@ -116,7 +116,7 @@ public abstract class AnalysisConfiguration { ...@@ -116,7 +116,7 @@ public abstract class AnalysisConfiguration {
* @return * @return
* the pipe instance which connects the two given stages * the pipe instance which connects the two given stages
* *
* @deprecated since 1.2 * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/ */
@Deprecated @Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
...@@ -132,7 +132,7 @@ public abstract class AnalysisConfiguration { ...@@ -132,7 +132,7 @@ public abstract class AnalysisConfiguration {
* capacity of the underlying queue * capacity of the underlying queue
* @return * @return
* *
* @deprecated since 1.2 * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/ */
@Deprecated @Deprecated
protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { protected static <T> IPipe connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
...@@ -148,7 +148,7 @@ public abstract class AnalysisConfiguration { ...@@ -148,7 +148,7 @@ public abstract class AnalysisConfiguration {
* capacity of the underlying queue * capacity of the underlying queue
* @return * @return
* *
* @deprecated since 1.2 * @deprecated since 1.2. Use {@link #connectPorts(OutputPort, InputPort)} instead.
*/ */
@Deprecated @Deprecated
protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { protected static <T> IPipe connectUnboundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
......
...@@ -29,6 +29,7 @@ import org.junit.Test; ...@@ -29,6 +29,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import teetime.stage.InitialElementProducer; import teetime.stage.InitialElementProducer;
import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.Sink; import teetime.stage.basic.Sink;
import teetime.util.StopWatch; import teetime.util.StopWatch;
...@@ -131,6 +132,22 @@ public class AnalysisTest { ...@@ -131,6 +132,22 @@ public class AnalysisTest {
public void testInstantiatePipesIncorrectConfiguration() { public void testInstantiatePipesIncorrectConfiguration() {
thrown.expect(IllegalStateException.class); thrown.expect(IllegalStateException.class);
thrown.expectMessage("Crossing threads"); thrown.expectMessage("Crossing threads");
InvalidTestConfig configuration = new InvalidTestConfig();
new Analysis<InvalidTestConfig>(configuration);
}
private class InvalidTestConfig extends AnalysisConfiguration {
public InitialElementProducer<Object> init = new InitialElementProducer<Object>();
public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class);
public Sink<Object> sink = new Sink<Object>();
public InvalidTestConfig() {
connectPorts(init.getOutputPort(), iof.getInputPort());
connectPorts(iof.getMatchedOutputPort(), sink.getInputPort());
connectPorts(init.createOutputPort(), sink.createInputPort());
addThreadableStage(init);
addThreadableStage(iof);
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment