diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 79c143abfeb328b6924aa63ab692e859953b8368..6dab49dad506e7ae9a8718f089f3f01df688c10d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -56,7 +56,6 @@ public abstract class AbstractStage implements StageWithPort { // return outputPort.send(element); } - @SuppressWarnings("unchecked") private void connectUnconnectedOutputPorts() { for (OutputPort<?> outputPort : this.cachedOutputPorts) { if (null == outputPort.getPipe()) { // if port is unconnected diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java index ee22541cb63b3309d3d04b96d7b8a466fc506d59..ad9b44dd4474d90ef720f9a0d0c1eadc2bc27aaf 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java @@ -1,12 +1,17 @@ package teetime.variant.methodcallWithPorts.framework.core; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Analysis { +import teetime.util.Pair; + +public class Analysis implements UncaughtExceptionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); @@ -16,6 +21,8 @@ public class Analysis { private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); + private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); + public Analysis(final Configuration configuration) { this.configuration = configuration; } @@ -37,17 +44,24 @@ public class Analysis { } } - public void start() { + /** + * + * @return a map of thread/throwable pair + */ + public Collection<Pair<Thread, Throwable>> start() { // start analysis for (Thread thread : this.consumerThreads) { + thread.setUncaughtExceptionHandler(this); thread.start(); } for (Thread thread : this.finiteProducerThreads) { + thread.setUncaughtExceptionHandler(this); thread.start(); } for (Thread thread : this.infiniteProducerThreads) { + thread.setUncaughtExceptionHandler(this); thread.start(); } @@ -75,9 +89,16 @@ public class Analysis { for (Thread thread : this.infiniteProducerThreads) { thread.interrupt(); } + + return this.exceptions; } public Configuration getConfiguration() { return this.configuration; } + + @Override + public void uncaughtException(final Thread t, final Throwable e) { + this.exceptions.add(Pair.of(t, e)); + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 8298ebaebad317741cf53308e5fd22bbd8bfedc8..22a95ff654035fe3786b888a48536122394c806b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -42,6 +42,9 @@ public class RunnableStage implements Runnable { TerminatingSignal terminatingSignal = new TerminatingSignal(); this.stage.onSignal(terminatingSignal, null); + } catch (Error e) { + this.logger.error("Terminating thread due to the following exception: ", e); + throw e; } catch (RuntimeException e) { this.logger.error("Terminating thread due to the following exception: ", e); throw e; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/Countdown.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/Countdown.java new file mode 100644 index 0000000000000000000000000000000000000000..cc7dbed62637a16eebb61ec559c7d4ae2580143c --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/Countdown.java @@ -0,0 +1,44 @@ +package teetime.variant.methodcallWithPorts.examples.loopStage; + +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + +public class Countdown extends ProducerStage<Void> { + + private final InputPort<Integer> countdownInputPort = this.createInputPort(); + + private final OutputPort<Integer> newCountdownOutputPort = this.createOutputPort(); + + private final Integer initialCountdown; + + public Countdown(final Integer initialCountdown) { + this.initialCountdown = initialCountdown; + } + + @Override + public void onStarting() { + this.countdownInputPort.getPipe().add(this.initialCountdown); + super.onStarting(); + } + + @Override + protected void execute() { + Integer countdown = this.countdownInputPort.receive(); + if (countdown == 0) { + this.send(this.outputPort, null); + this.terminate(); + } else { + this.send(this.newCountdownOutputPort, --countdown); + } + } + + public InputPort<Integer> getCountdownInputPort() { + return this.countdownInputPort; + } + + public OutputPort<Integer> getNewCountdownOutputPort() { + return this.newCountdownOutputPort; + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/FiniteSignalPassingTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/FiniteSignalPassingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f972c021fda2ac05327804aed548da08bf903f25 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/FiniteSignalPassingTest.java @@ -0,0 +1,24 @@ +package teetime.variant.methodcallWithPorts.examples.loopStage; + +import static org.junit.Assert.assertEquals; + +import java.util.Collection; + +import org.junit.Test; + +import teetime.util.Pair; +import teetime.variant.methodcallWithPorts.framework.core.Analysis; + +public class FiniteSignalPassingTest { + + @Test(timeout = 5000) + // may not run infinitely, so we set an arbitrary timeout that is high enough + public void testStartSignalDoesNotEndUpInInfiniteLoop() throws Exception { + LoopStageAnalysisConfiguration configuration = new LoopStageAnalysisConfiguration(); + Analysis analysis = new Analysis(configuration); + analysis.init(); + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + + assertEquals(0, exceptions.size()); + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/LoopStageAnalysisConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/LoopStageAnalysisConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..0ed395946272c756ff4856e4771fa9459022412f --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/loopStage/LoopStageAnalysisConfiguration.java @@ -0,0 +1,17 @@ +package teetime.variant.methodcallWithPorts.examples.loopStage; + +import teetime.variant.methodcallWithPorts.framework.core.Configuration; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + +public class LoopStageAnalysisConfiguration extends Configuration { + + public LoopStageAnalysisConfiguration() { + Countdown countdown = new Countdown(10); + + PipeFactory.INSTANCE.create(ThreadCommunication.INTRA) + .connectPorts(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort()); + + this.getFiniteProducerStages().add(countdown); + } +}