diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 46e2315dfe52e0d03e97b467626c50d446d94f5a..c89df66e665f9a55785f6bed674aec2d53eebf26 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -43,10 +43,6 @@ class ThreadService extends AbstractService<ThreadService> { } - SignalingCounter getRunnableCounter() { - return runnableCounter; - } - private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); @@ -75,6 +71,62 @@ class ThreadService extends AbstractService<ThreadService> { onStart(); } + @Override + void onStart() { + startThreads(this.consumerThreads); + startThreads(this.finiteProducerThreads); + startThreads(this.infiniteProducerThreads); + + sendInitializingSignal(); + } + + @Override + void onExecute() { + sendStartingSignal(); + } + + @Override + void onTerminate() { + for (Stage stage : threadableStages.keySet()) { + stage.terminate(); + } + } + + @Override + void onFinish() { + try { + runnableCounter.waitFor(0); + + // LOGGER.debug("Waiting for finiteProducerThreads"); + // for (Thread thread : this.finiteProducerThreads) { + // thread.join(); + // } + // + // LOGGER.debug("Waiting for consumerThreads"); + // for (Thread thread : this.consumerThreads) { + // thread.join(); + // } + } catch (InterruptedException e) { + LOGGER.error("Execution has stopped unexpectedly", e); + for (Thread thread : this.finiteProducerThreads) { + thread.interrupt(); + } + + for (Thread thread : this.consumerThreads) { + thread.interrupt(); + } + } + + LOGGER.debug("Interrupting infiniteProducerThreads..."); + for (Thread thread : this.infiniteProducerThreads) { + thread.interrupt(); + } + + if (!exceptions.isEmpty()) { + throw new ExecutionException(exceptions); + } + } + private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); @@ -135,55 +187,6 @@ class ThreadService extends AbstractService<ThreadService> { } } - @Override - void onFinish() { - try { - runnableCounter.waitFor(0); - - // LOGGER.debug("Waiting for finiteProducerThreads"); - // for (Thread thread : this.finiteProducerThreads) { - // thread.join(); - // } - // - // LOGGER.debug("Waiting for consumerThreads"); - // for (Thread thread : this.consumerThreads) { - // thread.join(); - // } - } catch (InterruptedException e) { - LOGGER.error("Execution has stopped unexpectedly", e); - for (Thread thread : this.finiteProducerThreads) { - thread.interrupt(); - } - - for (Thread thread : this.consumerThreads) { - thread.interrupt(); - } - } - - LOGGER.debug("Interrupting infiniteProducerThreads..."); - for (Thread thread : this.infiniteProducerThreads) { - thread.interrupt(); - } - - if (!exceptions.isEmpty()) { - throw new ExecutionException(exceptions); - } - } - - @Override - void onExecute() { - sendStartingSignal(); - } - - @Override - void onStart() { - startThreads(this.consumerThreads); - startThreads(this.finiteProducerThreads); - startThreads(this.infiniteProducerThreads); - - sendInitializingSignal(); - } - private void startThreads(final Iterable<Thread> threads) { for (Thread thread : threads) { thread.start(); @@ -216,11 +219,8 @@ class ThreadService extends AbstractService<ThreadService> { source.setThreadableStages(this.getThreadableStages()); } - @Override - void onTerminate() { - for (Stage stage : threadableStages.keySet()) { - stage.terminate(); - } + SignalingCounter getRunnableCounter() { + return runnableCounter; } }