Skip to content
Snippets Groups Projects
Commit f00d362f authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

minor refactoring and solved issue with one junit test

parent 82acdaf7
No related branches found
No related tags found
1 merge request!41Pipe instantiation
...@@ -73,7 +73,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -73,7 +73,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private Integer connected = new Integer(0); private Integer createdConnections = new Integer(0);
/** /**
* Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener.
...@@ -143,7 +143,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -143,7 +143,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
final Thread thread = initializeStages(stage); final Thread thread = initializeThreadableStages(stage);
final Set<Stage> intraStages = traverseIntraStages(stage); final Set<Stage> intraStages = traverseIntraStages(stage);
final AbstractExceptionListener newListener = factory.createInstance(); final AbstractExceptionListener newListener = factory.createInstance();
...@@ -152,7 +152,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -152,7 +152,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
private Thread initializeStages(final Stage stage) { private Thread initializeThreadableStages(final Stage stage) {
final Thread thread; final Thread thread;
final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); final TerminationStrategy terminationStrategy = stage.getTerminationStrategy();
...@@ -209,8 +209,11 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -209,8 +209,11 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
colors.put(threadableStage, i); // Markiere den threadHead colors.put(threadableStage, i); // Markiere den threadHead
colorAndConnectStages(i, colors, threadableStage); colorAndConnectStages(i, colors, threadableStage);
} }
if (configuration.getConnections().size() != connected) { if (configuration.getConnections().size() != createdConnections) {
throw new IllegalStateException("remaining " + (configuration.getConnections().size() - connected) + " connections"); for (Connection<?> conn : configuration.getConnections()) {
System.out.println(conn.getSourcePort().getOwningStage().getId() + " connects with " + conn.getTargetPort().getOwningStage().getId());
}
throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)");
} }
} }
...@@ -233,15 +236,14 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -233,15 +236,14 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} else { } else {
if (colors.containsKey(targetStage)) { if (colors.containsKey(targetStage)) {
if (colors.get(targetStage).equals(i)) { if (colors.get(targetStage).equals(i)) {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (not the "headstage") throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
} }
} }
intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort());
colors.put(targetStage, i); colors.put(targetStage, i);
colorAndConnectStages(i, colors, targetStage); colorAndConnectStages(i, colors, targetStage);
} }
connected++; createdConnections++;
// configuration.getConnections().remove(connection); remove connection to increase performance
} }
} }
} }
......
...@@ -35,7 +35,8 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio ...@@ -35,7 +35,8 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink); addThreadableStage(collectorSink);
connectPorts(producer.getOutputPort(), collectorSink.getInputPort()); // Can not use createPorts, as the if condition above will lead to an exception
connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink; this.collectorSink = collectorSink;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment