diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index a44aeadef17498246d7cce1da210aa2b26e0bd1f..82abb1b2445781efeb6e81b8c6f09d0698dd6c36 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -35,7 +35,7 @@ import teetime.util.Pair; * Represents an Analysis to which stages can be added and executed later. * This needs a {@link AnalysisConfiguration}, * in which the adding and configuring of stages takes place. - * To start the analysis {@link #execute()} needs to be executed. + * To start the analysis {@link #executeBlocking()} needs to be executed. * This class will automatically create threads and join them without any further commitment. */ public final class Analysis implements UncaughtExceptionHandler { @@ -164,7 +164,7 @@ public final class Analysis implements UncaughtExceptionHandler { * * @return a collection of thread/throwable pairs * - * @deprecated since 1.1, replaced by {@link #execute()} + * @deprecated since 1.1, replaced by {@link #executeBlocking()} */ @Deprecated public Collection<Pair<Thread, Throwable>> start() { @@ -196,25 +196,86 @@ public final class Analysis implements UncaughtExceptionHandler { for (Thread thread : this.infiniteProducerThreads) { thread.interrupt(); } - return this.exceptions; } /** - * This method will start the Analysis and all containing stages. + * Calling this method will block the current thread, until the analysis terminates. * * @throws AnalysisException * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable * * @since 1.1 */ - public void execute() { - start(); + public void waitForTermination() { + + try { + for (Thread thread : this.finiteProducerThreads) { + thread.join(); + } + + for (Thread thread : this.consumerThreads) { + thread.join(); + } + } catch (InterruptedException e) { + LOGGER.error("Analysis has stopped unexpectedly", e); + for (Thread thread : this.finiteProducerThreads) { + thread.interrupt(); + } + + for (Thread thread : this.consumerThreads) { + thread.interrupt(); + } + } + + for (Thread thread : this.infiniteProducerThreads) { + thread.interrupt(); + } + if (!exceptions.isEmpty()) { throw new AnalysisException(exceptions); } } + // public void abortEventually() { + // for (Thread thread : this.finiteProducerThreads) { + // thread.interrupt(); + // } + // + // for (Thread thread : this.consumerThreads) { + // thread.interrupt(); + // } + // + // for (Thread thread : this.infiniteProducerThreads) { + // thread.interrupt(); + // } + // } + + /** + * This method will start the Analysis and block until it is finished. + * + * @throws AnalysisException + * if at least one exception in one thread has occurred within the analysis. The exception contains the pairs of thread and throwable + * + * @since 1.1 + */ + public void executeBlocking() { + executeNonBlocking(); + waitForTermination(); + } + + /** + * This method starts the analysis without waiting for its termination. The method {@link #waitForTermination()} must be called to unsure a correct termination + * of the analysis. + * + * @since 1.1 + */ + public void executeNonBlocking() { + startThreads(this.consumerThreads); + startThreads(this.finiteProducerThreads); + startThreads(this.infiniteProducerThreads); + } + private void startThreads(final Iterable<Thread> threads) { for (Thread thread : threads) { thread.setUncaughtExceptionHandler(this); diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index bf8e5aaeafe82f45f9a11259701c752cc6b3ec64..197a66d2483bdf305205f961fa2f452d19ac4ebe 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -47,7 +47,6 @@ public class TraversorTest { public TestConfiguration() { int threads = 2; init = new InitialElementProducer<File>(new File("")); - // final File2Lines f2b = new File2Lines(); f2b = new File2SeqOfWords("UTF-8", 512); distributor = new Distributor<String>(new RoundRobinStrategy2()); diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java index b83ddb36c24f20ed28b84017304ea53738b12aa2..370093ccfbef97d89e69676d78f792de329be0aa 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandling.java @@ -33,7 +33,7 @@ public class ExceptionHandling { // @Test(timeout = 5000, expected = RuntimeException.class) public void exceptionPassingAndTermination() { - analysis.execute(); + analysis.executeBlocking(); assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread to early } diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index cfdd85c508e6f81f4d9daaf8d2ea0e706fcc442a..4a85cf416b70634a57ec6dc49c4cf4b0fc160367 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -118,7 +118,7 @@ public class InstanceOfFilterTest { InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig(); Analysis analysis = new Analysis(config); try { - analysis.execute(); + analysis.executeBlocking(); } catch (AnalysisException e) { Collection<Pair<Thread, Throwable>> thrownExceptions = e.getThrownExceptions(); // TODO: handle exception