Skip to content
Snippets Groups Projects
Commit 58d2c74e authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

extracted logic into seperate method

parent cfa33650
No related branches found
No related tags found
No related merge requests found
......@@ -140,35 +140,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
for (Stage stage : threadableStageJobs) {
final Thread thread;
final TerminationStrategy terminationStrategy = stage.getTerminationStrategy();
switch (terminationStrategy) {
case BY_SIGNAL: {
final RunnableConsumerStage runnable = new RunnableConsumerStage(stage);
thread = createThread(runnable, stage.getId());
this.consumerThreads.add(thread);
break;
}
case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
break;
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
this.infiniteProducerThreads.add(thread);
break;
}
default:
throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy);
}
final Thread thread = initializeStages(stage);
final Set<Stage> intraStages = traverseIntraStages(stage);
final AbstractExceptionListener newListener = factory.createInstance();
initializeIntraStages(intraStages, thread, newListener);
......@@ -176,6 +149,39 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
private Thread initializeStages(final Stage stage) {
final Thread thread;
final TerminationStrategy terminationStrategy = stage.getTerminationStrategy();
switch (terminationStrategy) {
case BY_SIGNAL: {
final RunnableConsumerStage runnable = new RunnableConsumerStage(stage);
thread = createThread(runnable, stage.getId());
this.consumerThreads.add(thread);
break;
}
case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
break;
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
this.infiniteProducerThreads.add(thread);
break;
}
default:
throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy);
}
return thread;
}
private void instantiatePipes() {
Set<Stage> threadableStageJobs = configuration.getThreadableStageJobs();
for (Connection connection : configuration.getConnections()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment