diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 4812ea1f950b08b4c3f8a230cf12e6da71c8a870..956d419046e268db51fbcda319636b0688493817 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -140,35 +140,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } for (Stage stage : threadableStageJobs) { - final Thread thread; - - final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); - switch (terminationStrategy) { - case BY_SIGNAL: { - final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - thread = createThread(runnable, stage.getId()); - this.consumerThreads.add(thread); - break; - } - case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - thread = createThread(runnable, stage.getId()); - this.finiteProducerThreads.add(thread); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - break; - } - case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage); - thread = createThread(runnable, stage.getId()); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); - this.infiniteProducerThreads.add(thread); - break; - } - default: - throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); - } + final Thread thread = initializeStages(stage); + final Set<Stage> intraStages = traverseIntraStages(stage); final AbstractExceptionListener newListener = factory.createInstance(); initializeIntraStages(intraStages, thread, newListener); @@ -176,6 +149,39 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } + private Thread initializeStages(final Stage stage) { + final Thread thread; + + final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); + switch (terminationStrategy) { + case BY_SIGNAL: { + final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); + thread = createThread(runnable, stage.getId()); + this.consumerThreads.add(thread); + break; + } + case BY_SELF_DECISION: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + thread = createThread(runnable, stage.getId()); + this.finiteProducerThreads.add(thread); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + break; + } + case BY_INTERRUPT: { + final RunnableProducerStage runnable = new RunnableProducerStage(stage); + thread = createThread(runnable, stage.getId()); + InitializingSignal initializingSignal = new InitializingSignal(); + stage.onSignal(initializingSignal, null); + this.infiniteProducerThreads.add(thread); + break; + } + default: + throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); + } + return thread; + } + private void instantiatePipes() { Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs(); for (Connection connection : configuration.getConnections()) {