Skip to content
Snippets Groups Projects
Commit acc3b2a9 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

some clean-up

parent 90194686
No related branches found
No related tags found
No related merge requests found
......@@ -43,10 +43,6 @@ class ThreadService extends AbstractService<ThreadService> {
}
SignalingCounter getRunnableCounter() {
return runnableCounter;
}
private final Collection<ThreadThrowableContainer> exceptions = new ConcurrentLinkedQueue<ThreadThrowableContainer>();
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
......@@ -75,6 +71,62 @@ class ThreadService extends AbstractService<ThreadService> {
onStart();
}
@Override
void onStart() {
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendInitializingSignal();
}
@Override
void onExecute() {
sendStartingSignal();
}
@Override
void onTerminate() {
for (Stage stage : threadableStages.keySet()) {
stage.terminate();
}
}
@Override
void onFinish() {
try {
runnableCounter.waitFor(0);
// LOGGER.debug("Waiting for finiteProducerThreads");
// for (Thread thread : this.finiteProducerThreads) {
// thread.join();
// }
//
// LOGGER.debug("Waiting for consumerThreads");
// for (Thread thread : this.consumerThreads) {
// thread.join();
// }
} catch (InterruptedException e) {
LOGGER.error("Execution has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) {
thread.interrupt();
}
for (Thread thread : this.consumerThreads) {
thread.interrupt();
}
}
LOGGER.debug("Interrupting infiniteProducerThreads...");
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
if (!exceptions.isEmpty()) {
throw new ExecutionException(exceptions);
}
}
private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) {
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
......@@ -135,55 +187,6 @@ class ThreadService extends AbstractService<ThreadService> {
}
}
@Override
void onFinish() {
try {
runnableCounter.waitFor(0);
// LOGGER.debug("Waiting for finiteProducerThreads");
// for (Thread thread : this.finiteProducerThreads) {
// thread.join();
// }
//
// LOGGER.debug("Waiting for consumerThreads");
// for (Thread thread : this.consumerThreads) {
// thread.join();
// }
} catch (InterruptedException e) {
LOGGER.error("Execution has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) {
thread.interrupt();
}
for (Thread thread : this.consumerThreads) {
thread.interrupt();
}
}
LOGGER.debug("Interrupting infiniteProducerThreads...");
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
if (!exceptions.isEmpty()) {
throw new ExecutionException(exceptions);
}
}
@Override
void onExecute() {
sendStartingSignal();
}
@Override
void onStart() {
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendInitializingSignal();
}
private void startThreads(final Iterable<Thread> threads) {
for (Thread thread : threads) {
thread.start();
......@@ -216,11 +219,8 @@ class ThreadService extends AbstractService<ThreadService> {
source.setThreadableStages(this.getThreadableStages());
}
@Override
void onTerminate() {
for (Stage stage : threadableStages.keySet()) {
stage.terminate();
}
SignalingCounter getRunnableCounter() {
return runnableCounter;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment