Skip to content
Snippets Groups Projects
Commit 7a5f8349 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

introduced new methods to enable a non blocking execution of an analysis

parent c9be8f81
No related branches found
No related tags found
No related merge requests found
......@@ -35,7 +35,7 @@ import teetime.util.Pair;
* Represents an Analysis to which stages can be added and executed later.
* This needs a {@link AnalysisConfiguration},
* in which the adding and configuring of stages takes place.
* To start the analysis {@link #execute()} needs to be executed.
* To start the analysis {@link #executeBlocking()} needs to be executed.
* This class will automatically create threads and join them without any further commitment.
*/
public final class Analysis implements UncaughtExceptionHandler {
......@@ -164,7 +164,7 @@ public final class Analysis implements UncaughtExceptionHandler {
*
* @return a collection of thread/throwable pairs
*
* @deprecated since 1.1, replaced by {@link #execute()}
* @deprecated since 1.1, replaced by {@link #executeBlocking()}
*/
@Deprecated
public Collection<Pair<Thread, Throwable>> start() {
......@@ -196,25 +196,86 @@ public final class Analysis implements UncaughtExceptionHandler {
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
return this.exceptions;
}
/**
* This method will start the Analysis and all containing stages.
* Calling this method will block the current thread, until the analysis terminates.
*
* @throws AnalysisException
* if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable
*
* @since 1.1
*/
public void execute() {
start();
public void waitForTermination() {
try {
for (Thread thread : this.finiteProducerThreads) {
thread.join();
}
for (Thread thread : this.consumerThreads) {
thread.join();
}
} catch (InterruptedException e) {
LOGGER.error("Analysis has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) {
thread.interrupt();
}
for (Thread thread : this.consumerThreads) {
thread.interrupt();
}
}
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
if (!exceptions.isEmpty()) {
throw new AnalysisException(exceptions);
}
}
// public void abortEventually() {
// for (Thread thread : this.finiteProducerThreads) {
// thread.interrupt();
// }
//
// for (Thread thread : this.consumerThreads) {
// thread.interrupt();
// }
//
// for (Thread thread : this.infiniteProducerThreads) {
// thread.interrupt();
// }
// }
/**
* This method will start the Analysis and block until it is finished.
*
* @throws AnalysisException
* if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable
*
* @since 1.1
*/
public void executeBlocking() {
executeNonBlocking();
waitForTermination();
}
/**
* This method starts the analysis without waiting for its termination. The method {@link #waitForTermination()} must be called to unsure a correct termination
* of the analysis.
*
* @since 1.1
*/
public void executeNonBlocking() {
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
}
private void startThreads(final Iterable<Thread> threads) {
for (Thread thread : threads) {
thread.setUncaughtExceptionHandler(this);
......
......@@ -47,7 +47,6 @@ public class TraversorTest {
public TestConfiguration() {
int threads = 2;
init = new InitialElementProducer<File>(new File(""));
// final File2Lines f2b = new File2Lines();
f2b = new File2SeqOfWords("UTF-8", 512);
distributor = new Distributor<String>(new RoundRobinStrategy2());
......
......@@ -33,7 +33,7 @@ public class ExceptionHandling {
// @Test(timeout = 5000, expected = RuntimeException.class)
public void exceptionPassingAndTermination() {
analysis.execute();
analysis.executeBlocking();
assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread to early
}
......
......@@ -118,7 +118,7 @@ public class InstanceOfFilterTest {
InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig();
Analysis analysis = new Analysis(config);
try {
analysis.execute();
analysis.executeBlocking();
} catch (AnalysisException e) {
Collection<Pair<Thread, Throwable>> thrownExceptions = e.getThrownExceptions();
// TODO: handle exception
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment