diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java index ca2b6a421d05c0e83b22eb8a9eecf35b745e2b8c..c8935aa9edb2ab0fe6125035b3e698876be87224 100644 --- a/src/main/java/teetime/framework/AbstractConsumerStage.java +++ b/src/main/java/teetime/framework/AbstractConsumerStage.java @@ -15,6 +15,7 @@ */ package teetime.framework; +import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.StageException; public abstract class AbstractConsumerStage<I> extends AbstractStage { @@ -35,7 +36,10 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage { try { this.execute(element); } catch (Exception e) { - throw new StageException(e, this); + final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this); + if (furtherExecution == FurtherExecution.TERMINATE) { + throw new StageException(e, this); + } } } diff --git a/src/main/java/teetime/framework/AbstractProducerStage.java b/src/main/java/teetime/framework/AbstractProducerStage.java index f85c5f8b0b50c01a61854c146fc6116e01b06517..8ee88e594d98c843eaddd0e5c0ed397de0b548e3 100644 --- a/src/main/java/teetime/framework/AbstractProducerStage.java +++ b/src/main/java/teetime/framework/AbstractProducerStage.java @@ -15,6 +15,7 @@ */ package teetime.framework; +import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.StageException; /** @@ -39,7 +40,10 @@ public abstract class AbstractProducerStage<O> extends AbstractStage { try { this.execute(); } catch (Exception e) { - throw new StageException(e, this); + final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, this); + if (furtherExecution == FurtherExecution.TERMINATE) { + throw new StageException(e, this); + } } } diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 428b679bbfc59ee15b9ec4dddde7b04deb907a98..00f4f59fe81c365007fe5ab0be363753a5f9bc2d 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -18,25 +18,20 @@ package teetime.framework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.exceptionHandling.AbstractExceptionListener; -import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution; import teetime.framework.exceptionHandling.StageException; import teetime.framework.signal.TerminatingSignal; abstract class AbstractRunnableStage implements Runnable { - private final AbstractExceptionListener exceptionHandler; - private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: "; private final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; - public AbstractRunnableStage(final Stage stage, final AbstractExceptionListener exceptionHandler) { + public AbstractRunnableStage(final Stage stage) { this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); - this.exceptionHandler = exceptionHandler; } @Override @@ -45,19 +40,13 @@ abstract class AbstractRunnableStage implements Runnable { boolean failed = false; try { beforeStageExecution(stage); - - do { - try { + try { + do { executeStage(stage); - } catch (StageException e) { - final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, e.getThrowingStage()); - if (furtherExecution == FurtherExecution.TERMINATE) { - this.stage.terminate(); - failed = true; - } - } - } while (!stage.shouldBeTerminated()); - + } while (!stage.shouldBeTerminated()); + } catch (StageException e) { + this.stage.terminate(); + } afterStageExecution(stage); } catch (RuntimeException e) { diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index d62b3831ec08d4f2792aa56f50f4cdb764826b5f..711c5c90b1b925556b4a77caf0bf2c875c20ecbd 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -136,30 +136,36 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught newListener = factory.create(); switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { - final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage, newListener); + final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage); final Thread thread = new Thread(runnableConsumerStage); + stage.setExceptionHandler(newListener); for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); + intraStage.setExceptionHandler(newListener); } this.consumerThreads.add(thread); thread.setName(stage.getId()); break; } case BY_SELF_DECISION: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener); + final RunnableProducerStage runnable = new RunnableProducerStage(stage); final Thread thread = new Thread(runnable); + stage.setExceptionHandler(newListener); for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); + intraStage.setExceptionHandler(newListener); } this.finiteProducerThreads.add(thread); thread.setName(stage.getId()); break; } case BY_INTERRUPT: { - final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener); + final RunnableProducerStage runnable = new RunnableProducerStage(stage); final Thread thread = new Thread(runnable); + stage.setExceptionHandler(newListener); for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); + intraStage.setExceptionHandler(newListener); } this.infiniteProducerThreads.add(thread); thread.setName(stage.getId()); diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 47396601d5e9a03c541a966ef022296ea68e76ba..5a8e39643208d88882ca578bd0a6475813fb1e16 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -15,7 +15,6 @@ */ package teetime.framework; -import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.YieldStrategy; import teetime.framework.signal.ISignal; @@ -32,12 +31,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage { * @param stage * to execute within an own thread */ - public RunnableConsumerStage(final Stage stage, final AbstractExceptionListener exceptionListener) { - this(stage, new YieldStrategy(), exceptionListener); + public RunnableConsumerStage(final Stage stage) { + this(stage, new YieldStrategy()); } - public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy, final AbstractExceptionListener exceptionListener) { - super(stage, exceptionListener); + public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { + super(stage); this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage? } diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 0e82813afaea111957a5537887fdb5aa63376327..2b7b556af36be9f5dd00b4c827c48dea1b831dc5 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -15,14 +15,13 @@ */ package teetime.framework; -import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; public final class RunnableProducerStage extends AbstractRunnableStage { - public RunnableProducerStage(final Stage stage, final AbstractExceptionListener listener) { - super(stage, listener); + public RunnableProducerStage(final Stage stage) { + super(stage); } @Override diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index dc77a8b3598c111d3cda2a4736dcc94d2c690a0f..ab567e82c71e7ed2fde7ce84aa5786be93671c49 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; @@ -42,6 +43,8 @@ public abstract class Stage { @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; + protected AbstractExceptionListener exceptionHandler; + /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ protected Thread owningThread; @@ -131,4 +134,7 @@ public abstract class Stage { @SuppressWarnings("PMD.SignatureDeclareThrowsException") public abstract void onTerminating() throws Exception; + protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } } diff --git a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java index 7d1d06a20075455492ff126aea5d6d3d206027da..71499e81bdfc3b965c5fecd92ab58f82e12b2b5f 100644 --- a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java @@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling; import teetime.framework.Stage; -public class IgnoringExceptionListener extends AbstractExceptionListener { +class IgnoringExceptionListener extends AbstractExceptionListener { @Override public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { diff --git a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java index 928994f0784354a9e43662b6f67b5ce01c337a2d..53dce3295a0b561e081a6d51c27eb6af941f119b 100644 --- a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java @@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling; import teetime.framework.Stage; -public class LoggingExceptionListener extends AbstractExceptionListener { +class LoggingExceptionListener extends AbstractExceptionListener { @Override public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { diff --git a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java index 324d6430377629765e8e0e96dbc70fc41b338b56..7568264864061bfe4deb53c6a94e1d8eb61ef167 100644 --- a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java @@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling; import teetime.framework.Stage; -public class TerminatingExceptionListener extends AbstractExceptionListener { +class TerminatingExceptionListener extends AbstractExceptionListener { @Override public FurtherExecution onStageException(final Exception e, final Stage throwingStage) { diff --git a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java index 5965ff7882a94036a4d64e3013e3296c21c8f0a2..ef831990a9bb8a693a3ebed0241b9f2210513744 100644 --- a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java +++ b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java @@ -20,7 +20,6 @@ import java.util.List; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; -import teetime.framework.exceptionHandling.IgnoringExceptionListener; import teetime.framework.pipe.IPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis9 { public void init(final IPipeFactory pipeFactory) { Stage pipeline = this.buildPipeline(pipeFactory); - this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener()); + this.runnable = new RunnableProducerStage(pipeline); } /** diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java index e1267e8576490072cc70a97b4d1269da8d230001..0866ada04cc23a0d16769b22e7044e2cf22765b2 100644 --- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -20,7 +20,6 @@ import java.util.List; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; -import teetime.framework.exceptionHandling.IgnoringExceptionListener; import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis11 { public void init() { Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); - this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener()); + this.runnable = new RunnableProducerStage(pipeline); } private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects, diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index 48e57ba264e7812944791aadfe196cd0f1c9918f..be73f9db478370dedc30e91069d85486a38c90f1 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -21,7 +21,6 @@ import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; -import teetime.framework.exceptionHandling.IgnoringExceptionListener; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -65,10 +64,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { public void init() { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); - this.clockRunnable = new RunnableProducerStage(clockPipeline, new IgnoringExceptionListener()); + this.clockRunnable = new RunnableProducerStage(clockPipeline); Stage pipeline = this.buildPipeline(this.clock); - this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener()); + this.runnable = new RunnableProducerStage(pipeline); } private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() { diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index 639a806e9b532e107b5f6c1733d9e60e82d43060..ea66d118ff5989d312cf2f84d9ebaaf019ea1b55 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -15,7 +15,10 @@ */ package teetime.framework; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import org.junit.Assert; import org.junit.Test; @@ -49,6 +52,8 @@ public class StageTest { TestConfig tc = new TestConfig(); new Analysis<TestConfig>(tc); assertEquals(tc.init.owningThread, tc.delay.owningThread); + assertThat(tc.delay.exceptionHandler, is(notNullValue())); + assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler); } private static class TestConfig extends AnalysisConfiguration { diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java index 8aded34b468aa470c235e98b7b988dc1ec17e950..d6237ed4f8d8bf2b3deadd283726394d877a68a6 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java @@ -18,21 +18,20 @@ package teetime.framework.exceptionHandling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.junit.Ignore; import org.junit.Test; import teetime.framework.Analysis; public class ExceptionHandlingTest { - private Analysis analysis; + private Analysis<ExceptionTestConfiguration> analysis; // @Before public void newInstances() { - analysis = new Analysis(new ExceptionTestConfiguration(), new TestListenerFactory()); + analysis = new Analysis<ExceptionTestConfiguration>(new ExceptionTestConfiguration(), new TestListenerFactory()); } - // @Test(timeout = 5000, expected = RuntimeException.class) + @Test(timeout = 5000, expected = RuntimeException.class) public void exceptionPassingAndTermination() { analysis.executeBlocking(); assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread to early @@ -49,7 +48,6 @@ public class ExceptionHandlingTest { * SpScPipe.add and cycle through the sleep method. As a result, the thread will never return to the point * where it checks if it should be terminated. */ - @Ignore @Test(timeout = 30000) public void forAFewTimes() { for (int i = 0; i < 1000; i++) {