From f00d362f02046554f5205f715af994bcffae762a Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Fri, 12 Jun 2015 16:22:25 +0200 Subject: [PATCH] minor refactoring and solved issue with one junit test --- src/main/java/teetime/framework/Analysis.java | 18 ++++++++++-------- ...RunnableConsumerStageTestConfiguration.java | 3 ++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 17a7e94a..7ac043e6 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -73,7 +73,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); private final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); private final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private Integer connected = new Integer(0); + private Integer createdConnections = new Integer(0); /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. @@ -143,7 +143,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } for (Stage stage : threadableStageJobs) { - final Thread thread = initializeStages(stage); + final Thread thread = initializeThreadableStages(stage); final Set<Stage> intraStages = traverseIntraStages(stage); final AbstractExceptionListener newListener = factory.createInstance(); @@ -152,7 +152,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } - private Thread initializeStages(final Stage stage) { + private Thread initializeThreadableStages(final Stage stage) { final Thread thread; final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); @@ -209,8 +209,11 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught colors.put(threadableStage, i); // Markiere den threadHead colorAndConnectStages(i, colors, threadableStage); } - if (configuration.getConnections().size() != connected) { - throw new IllegalStateException("remaining " + (configuration.getConnections().size() - connected) + " connections"); + if (configuration.getConnections().size() != createdConnections) { + for (Connection<?> conn : configuration.getConnections()) { + System.out.println(conn.getSourcePort().getOwningStage().getId() + " connects with " + conn.getTargetPort().getOwningStage().getId()); + } + throw new IllegalStateException("Remaining " + (configuration.getConnections().size() - createdConnections) + " connection(s)"); } } @@ -233,15 +236,14 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } else { if (colors.containsKey(targetStage)) { if (colors.get(targetStage).equals(i)) { - throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (not the "headstage") + throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } intraThreadPipeFactory.create(connection.getSourcePort(), connection.getTargetPort()); colors.put(targetStage, i); colorAndConnectStages(i, colors, targetStage); } - connected++; - // configuration.getConnections().remove(connection); remove connection to increase performance + createdConnections++; } } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 7dca2a99..74d0cd73 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -35,7 +35,8 @@ public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguratio CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements); addThreadableStage(collectorSink); - connectPorts(producer.getOutputPort(), collectorSink.getInputPort()); + // Can not use createPorts, as the if condition above will lead to an exception + connectBoundedInterThreads(producer.getOutputPort(), collectorSink.getInputPort()); this.collectorSink = collectorSink; } -- GitLab