Skip to content
Snippets Groups Projects
Commit 051b6556 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

solved #92 and modified how analysis handles thread exceptions; quick

fixes for api change
parent e8b5688a
Branches
Tags
No related merge requests found
...@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; ...@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.IgnoringStageListener; import teetime.framework.exceptionHandling.IgnoringStageListener;
import teetime.framework.exceptionHandling.StageExceptionListener; import teetime.framework.exceptionHandling.StageExceptionListener;
import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair; import teetime.util.Pair;
...@@ -30,6 +31,8 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -30,6 +31,8 @@ public class Analysis implements UncaughtExceptionHandler {
private final StageExceptionListener listener; private final StageExceptionListener listener;
private boolean executionInterrupted = false;
private final List<Thread> consumerThreads = new LinkedList<Thread>(); private final List<Thread> consumerThreads = new LinkedList<Thread>();
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
...@@ -138,7 +141,7 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -138,7 +141,7 @@ public class Analysis implements UncaughtExceptionHandler {
* *
* @return a collection of thread/throwable pairs * @return a collection of thread/throwable pairs
*/ */
public Collection<Pair<Thread, Throwable>> start() { public void start() {
// start analysis // start analysis
startThreads(this.consumerThreads); startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads); startThreads(this.finiteProducerThreads);
...@@ -155,7 +158,6 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -155,7 +158,6 @@ public class Analysis implements UncaughtExceptionHandler {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.error("Analysis has stopped unexpectedly", e); LOGGER.error("Analysis has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) { for (Thread thread : this.finiteProducerThreads) {
thread.interrupt(); thread.interrupt();
} }
...@@ -168,8 +170,10 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -168,8 +170,10 @@ public class Analysis implements UncaughtExceptionHandler {
for (Thread thread : this.infiniteProducerThreads) { for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt(); thread.interrupt();
} }
if (!exceptions.isEmpty()) {
return this.exceptions; throw new RuntimeException("Errors while running analysis"); // TODO: add exceptions
}
// return this.exceptions;
} }
private void startThreads(final Iterable<Thread> threads) { private void startThreads(final Iterable<Thread> threads) {
...@@ -190,6 +194,24 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -190,6 +194,24 @@ public class Analysis implements UncaughtExceptionHandler {
@Override @Override
public void uncaughtException(final Thread thread, final Throwable throwable) { public void uncaughtException(final Thread thread, final Throwable throwable) {
if (!executionInterrupted) {
executionInterrupted = true;
LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now.");
for (Stage stage : configuration.getThreadableStageJobs()) {
if (stage.getOwningThread() != thread) {
switch (stage.getTerminationStrategy()) {
case BY_SELF_DECISION: {
stage.terminate(); // onSignal would also work, but this will execute in its own Thread
}
case BY_SIGNAL: {
final TerminatingSignal terminatingSignal = new TerminatingSignal();
stage.onSignal(terminatingSignal, null);
}
default:
}
}
}
}
this.exceptions.add(Pair.of(thread, throwable)); this.exceptions.add(Pair.of(thread, throwable));
} }
} }
package teetime.examples.loopStage; package teetime.examples.loopStage;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse;
import java.util.Collection;
import org.junit.Test; import org.junit.Test;
import teetime.framework.Analysis; import teetime.framework.Analysis;
import teetime.util.Pair;
public class FiniteSignalPassingTest { public class FiniteSignalPassingTest {
@Test(timeout = 5000) @Test(timeout = 5000)
// may not run infinitely, so we set an arbitrary timeout that is high enough // may not run infinitely, so we set an arbitrary timeout that is high enough
public void testStartSignalDoesNotEndUpInInfiniteLoop() throws Exception { public void testStartSignalDoesNotEndUpInInfiniteLoop() throws Exception {
boolean exceptionsOccured = false;
LoopStageAnalysisConfiguration configuration = new LoopStageAnalysisConfiguration(); LoopStageAnalysisConfiguration configuration = new LoopStageAnalysisConfiguration();
Analysis analysis = new Analysis(configuration); Analysis analysis = new Analysis(configuration);
analysis.init(); analysis.init();
Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); try {
analysis.start();
assertEquals(0, exceptions.size()); } catch (RuntimeException e) {
exceptionsOccured = true;
}
assertFalse(exceptionsOccured);
} }
} }
...@@ -11,4 +11,9 @@ public class ExceptionTestStage extends AbstractProducerStage { ...@@ -11,4 +11,9 @@ public class ExceptionTestStage extends AbstractProducerStage {
} }
loops++; loops++;
} }
@Override
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SELF_DECISION;
}
} }
...@@ -3,14 +3,9 @@ package teetime.framework; ...@@ -3,14 +3,9 @@ package teetime.framework;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.lang.Thread.State; import java.lang.Thread.State;
import java.util.Collection;
import org.junit.Test; import org.junit.Test;
import teetime.util.Pair;
import com.google.common.base.Joiner;
public class RunnableConsumerStageTest { public class RunnableConsumerStageTest {
@Test @Test
...@@ -68,12 +63,7 @@ public class RunnableConsumerStageTest { ...@@ -68,12 +63,7 @@ public class RunnableConsumerStageTest {
} }
private void start(final Analysis analysis) { private void start(final Analysis analysis) {
Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); analysis.start();
for (Pair<Thread, Throwable> pair : exceptions) { assertEquals(0, 0);
System.err.println(pair.getSecond());
System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
throw new RuntimeException(pair.getSecond());
}
assertEquals(0, exceptions.size());
} }
} }
package teetime.stage; package teetime.stage;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.junit.Test; import org.junit.Test;
...@@ -18,7 +17,6 @@ import teetime.framework.OutputPort; ...@@ -18,7 +17,6 @@ import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.util.Pair;
public class MultipleInstanceOfFilterTest { public class MultipleInstanceOfFilterTest {
...@@ -51,9 +49,11 @@ public class MultipleInstanceOfFilterTest { ...@@ -51,9 +49,11 @@ public class MultipleInstanceOfFilterTest {
final Analysis analysis = new Analysis(new TestConfiguration(initialInput, integerList, floatList)); final Analysis analysis = new Analysis(new TestConfiguration(initialInput, integerList, floatList));
analysis.init(); analysis.init();
final Collection<Pair<Thread, Throwable>> errors = analysis.start(); try {
analysis.start();
assertThat(errors, is(empty())); } catch (Exception e) {
assertTrue(false);
}
assertThat(integerList, contains(1, 2, 3)); assertThat(integerList, contains(1, 2, 3));
assertThat(floatList, contains(1.5f, 2.5f, 3.5f)); assertThat(floatList, contains(1.5f, 2.5f, 3.5f));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment