diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ab75d4b88255e86c36cdabf4d5ebaca58b286bee..71f746199a2d0e570f831d81f1f5479521045eef 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.IgnoringStageListener; import teetime.framework.exceptionHandling.StageExceptionListener; +import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; @@ -30,6 +31,8 @@ public class Analysis implements UncaughtExceptionHandler { private final StageExceptionListener listener; + private boolean executionInterrupted = false; + private final List<Thread> consumerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); @@ -138,7 +141,7 @@ public class Analysis implements UncaughtExceptionHandler { * * @return a collection of thread/throwable pairs */ - public Collection<Pair<Thread, Throwable>> start() { + public void start() { // start analysis startThreads(this.consumerThreads); startThreads(this.finiteProducerThreads); @@ -155,7 +158,6 @@ public class Analysis implements UncaughtExceptionHandler { } } catch (InterruptedException e) { LOGGER.error("Analysis has stopped unexpectedly", e); - for (Thread thread : this.finiteProducerThreads) { thread.interrupt(); } @@ -168,8 +170,10 @@ public class Analysis implements UncaughtExceptionHandler { for (Thread thread : this.infiniteProducerThreads) { thread.interrupt(); } - - return this.exceptions; + if (!exceptions.isEmpty()) { + throw new RuntimeException("Errors while running analysis"); // TODO: add exceptions + } + // return this.exceptions; } private void startThreads(final Iterable<Thread> threads) { @@ -190,6 +194,24 @@ public class Analysis implements UncaughtExceptionHandler { @Override public void uncaughtException(final Thread thread, final Throwable throwable) { + if (!executionInterrupted) { + executionInterrupted = true; + LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); + for (Stage stage : configuration.getThreadableStageJobs()) { + if (stage.getOwningThread() != thread) { + switch (stage.getTerminationStrategy()) { + case BY_SELF_DECISION: { + stage.terminate(); // onSignal would also work, but this will execute in its own Thread + } + case BY_SIGNAL: { + final TerminatingSignal terminatingSignal = new TerminatingSignal(); + stage.onSignal(terminatingSignal, null); + } + default: + } + } + } + } this.exceptions.add(Pair.of(thread, throwable)); } } diff --git a/src/performancetest/java/teetime/examples/loopStage/FiniteSignalPassingTest.java b/src/performancetest/java/teetime/examples/loopStage/FiniteSignalPassingTest.java index ffcef1df21063d6bac47c197222e4632d000c316..f1637c8e33b578ccbd45cfc8dbe892164c01d5d7 100644 --- a/src/performancetest/java/teetime/examples/loopStage/FiniteSignalPassingTest.java +++ b/src/performancetest/java/teetime/examples/loopStage/FiniteSignalPassingTest.java @@ -1,24 +1,25 @@ package teetime.examples.loopStage; -import static org.junit.Assert.assertEquals; - -import java.util.Collection; +import static org.junit.Assert.assertFalse; import org.junit.Test; import teetime.framework.Analysis; -import teetime.util.Pair; public class FiniteSignalPassingTest { @Test(timeout = 5000) // may not run infinitely, so we set an arbitrary timeout that is high enough public void testStartSignalDoesNotEndUpInInfiniteLoop() throws Exception { + boolean exceptionsOccured = false; LoopStageAnalysisConfiguration configuration = new LoopStageAnalysisConfiguration(); Analysis analysis = new Analysis(configuration); analysis.init(); - Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); - - assertEquals(0, exceptions.size()); + try { + analysis.start(); + } catch (RuntimeException e) { + exceptionsOccured = true; + } + assertFalse(exceptionsOccured); } } diff --git a/src/test/java/teetime/framework/ExceptionTestStage.java b/src/test/java/teetime/framework/ExceptionTestStage.java index c6aa4e64abfa6a69aa2f3f3da43e7a4653950e5d..ea37feaa62469a67f355024b4d7e91a6d8b459bf 100644 --- a/src/test/java/teetime/framework/ExceptionTestStage.java +++ b/src/test/java/teetime/framework/ExceptionTestStage.java @@ -11,4 +11,9 @@ public class ExceptionTestStage extends AbstractProducerStage { } loops++; } + + @Override + public TerminationStrategy getTerminationStrategy() { + return TerminationStrategy.BY_SELF_DECISION; + } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index 02335317b1ea182bebb49e4bd00697cf2ba5082c..5e2de12121d1050dce49140c75f178be5aab745c 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -3,14 +3,9 @@ package teetime.framework; import static org.junit.Assert.assertEquals; import java.lang.Thread.State; -import java.util.Collection; import org.junit.Test; -import teetime.util.Pair; - -import com.google.common.base.Joiner; - public class RunnableConsumerStageTest { @Test @@ -68,12 +63,7 @@ public class RunnableConsumerStageTest { } private void start(final Analysis analysis) { - Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); - for (Pair<Thread, Throwable> pair : exceptions) { - System.err.println(pair.getSecond()); - System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); - throw new RuntimeException(pair.getSecond()); - } - assertEquals(0, exceptions.size()); + analysis.start(); + assertEquals(0, 0); } } diff --git a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java index 4740dea2998c6d47bc5b9947d48c6c516e292be7..d1f7fea4ae83644f17039dc4255c12b76da809d7 100644 --- a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java @@ -1,13 +1,12 @@ package teetime.stage; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import org.junit.Test; @@ -18,7 +17,6 @@ import teetime.framework.OutputPort; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; -import teetime.util.Pair; public class MultipleInstanceOfFilterTest { @@ -51,9 +49,11 @@ public class MultipleInstanceOfFilterTest { final Analysis analysis = new Analysis(new TestConfiguration(initialInput, integerList, floatList)); analysis.init(); - final Collection<Pair<Thread, Throwable>> errors = analysis.start(); - - assertThat(errors, is(empty())); + try { + analysis.start(); + } catch (Exception e) { + assertTrue(false); + } assertThat(integerList, contains(1, 2, 3)); assertThat(floatList, contains(1.5f, 2.5f, 3.5f));