From d2d2474cf8f9b11b1037de95b52bfdbcb85cc89d Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de>
Date: Thu, 16 Apr 2015 16:37:01 +0200
Subject: [PATCH] moved exception handling from "thread head" to "throwing
 stage"; modified scope of Listener implementations (should only be
 instantiated by factories)

---
 .../framework/AbstractConsumerStage.java      |  6 ++++-
 .../framework/AbstractProducerStage.java      |  6 ++++-
 .../framework/AbstractRunnableStage.java      | 25 ++++++-------------
 src/main/java/teetime/framework/Analysis.java | 12 ++++++---
 .../framework/RunnableConsumerStage.java      |  9 +++----
 .../framework/RunnableProducerStage.java      |  5 ++--
 src/main/java/teetime/framework/Stage.java    |  6 +++++
 .../IgnoringExceptionListener.java            |  2 +-
 .../LoggingExceptionListener.java             |  2 +-
 .../TerminatingExceptionListener.java         |  2 +-
 .../MethodCallThroughputAnalysis9.java        |  3 +--
 .../MethodCallThroughputAnalysis11.java       |  3 +--
 .../MethodCallThroughputAnalysis15.java       |  5 ++--
 .../java/teetime/framework/StageTest.java     |  5 ++++
 .../ExceptionHandlingTest.java                |  8 +++---
 15 files changed, 53 insertions(+), 46 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java
index ca2b6a42..c8935aa9 100644
--- a/src/main/java/teetime/framework/AbstractConsumerStage.java
+++ b/src/main/java/teetime/framework/AbstractConsumerStage.java
@@ -15,6 +15,7 @@
  */
 package teetime.framework;
 
+import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
 import teetime.framework.exceptionHandling.StageException;
 
 public abstract class AbstractConsumerStage<I> extends AbstractStage {
@@ -35,7 +36,10 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage {
 		try {
 			this.execute(element);
 		} catch (Exception e) {
-			throw new StageException(e, this);
+			final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
+			if (furtherExecution == FurtherExecution.TERMINATE) {
+				throw new StageException(e, this);
+			}
 		}
 	}
 
diff --git a/src/main/java/teetime/framework/AbstractProducerStage.java b/src/main/java/teetime/framework/AbstractProducerStage.java
index f85c5f8b..8ee88e59 100644
--- a/src/main/java/teetime/framework/AbstractProducerStage.java
+++ b/src/main/java/teetime/framework/AbstractProducerStage.java
@@ -15,6 +15,7 @@
  */
 package teetime.framework;
 
+import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
 import teetime.framework.exceptionHandling.StageException;
 
 /**
@@ -39,7 +40,10 @@ public abstract class AbstractProducerStage<O> extends AbstractStage {
 		try {
 			this.execute();
 		} catch (Exception e) {
-			throw new StageException(e, this);
+			final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, this);
+			if (furtherExecution == FurtherExecution.TERMINATE) {
+				throw new StageException(e, this);
+			}
 		}
 	}
 
diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java
index 428b679b..00f4f59f 100644
--- a/src/main/java/teetime/framework/AbstractRunnableStage.java
+++ b/src/main/java/teetime/framework/AbstractRunnableStage.java
@@ -18,25 +18,20 @@ package teetime.framework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.exceptionHandling.AbstractExceptionListener;
-import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
 import teetime.framework.exceptionHandling.StageException;
 import teetime.framework.signal.TerminatingSignal;
 
 abstract class AbstractRunnableStage implements Runnable {
 
-	private final AbstractExceptionListener exceptionHandler;
-
 	private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: ";
 
 	private final Stage stage;
 	@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
 	protected final Logger logger;
 
-	public AbstractRunnableStage(final Stage stage, final AbstractExceptionListener exceptionHandler) {
+	public AbstractRunnableStage(final Stage stage) {
 		this.stage = stage;
 		this.logger = LoggerFactory.getLogger(stage.getClass());
-		this.exceptionHandler = exceptionHandler;
 	}
 
 	@Override
@@ -45,19 +40,13 @@ abstract class AbstractRunnableStage implements Runnable {
 		boolean failed = false;
 		try {
 			beforeStageExecution(stage);
-
-			do {
-				try {
+			try {
+				do {
 					executeStage(stage);
-				} catch (StageException e) {
-					final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, e.getThrowingStage());
-					if (furtherExecution == FurtherExecution.TERMINATE) {
-						this.stage.terminate();
-						failed = true;
-					}
-				}
-			} while (!stage.shouldBeTerminated());
-
+				} while (!stage.shouldBeTerminated());
+			} catch (StageException e) {
+				this.stage.terminate();
+			}
 			afterStageExecution(stage);
 
 		} catch (RuntimeException e) {
diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java
index d62b3831..711c5c90 100644
--- a/src/main/java/teetime/framework/Analysis.java
+++ b/src/main/java/teetime/framework/Analysis.java
@@ -136,30 +136,36 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
 			newListener = factory.create();
 			switch (stage.getTerminationStrategy()) {
 			case BY_SIGNAL: {
-				final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage, newListener);
+				final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage);
 				final Thread thread = new Thread(runnableConsumerStage);
+				stage.setExceptionHandler(newListener);
 				for (Stage intraStage : intraStages) {
 					intraStage.setOwningThread(thread);
+					intraStage.setExceptionHandler(newListener);
 				}
 				this.consumerThreads.add(thread);
 				thread.setName(stage.getId());
 				break;
 			}
 			case BY_SELF_DECISION: {
-				final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener);
+				final RunnableProducerStage runnable = new RunnableProducerStage(stage);
 				final Thread thread = new Thread(runnable);
+				stage.setExceptionHandler(newListener);
 				for (Stage intraStage : intraStages) {
 					intraStage.setOwningThread(thread);
+					intraStage.setExceptionHandler(newListener);
 				}
 				this.finiteProducerThreads.add(thread);
 				thread.setName(stage.getId());
 				break;
 			}
 			case BY_INTERRUPT: {
-				final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener);
+				final RunnableProducerStage runnable = new RunnableProducerStage(stage);
 				final Thread thread = new Thread(runnable);
+				stage.setExceptionHandler(newListener);
 				for (Stage intraStage : intraStages) {
 					intraStage.setOwningThread(thread);
+					intraStage.setExceptionHandler(newListener);
 				}
 				this.infiniteProducerThreads.add(thread);
 				thread.setName(stage.getId());
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index 47396601..5a8e3964 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -15,7 +15,6 @@
  */
 package teetime.framework;
 
-import teetime.framework.exceptionHandling.AbstractExceptionListener;
 import teetime.framework.idle.IdleStrategy;
 import teetime.framework.idle.YieldStrategy;
 import teetime.framework.signal.ISignal;
@@ -32,12 +31,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
 	 * @param stage
 	 *            to execute within an own thread
 	 */
-	public RunnableConsumerStage(final Stage stage, final AbstractExceptionListener exceptionListener) {
-		this(stage, new YieldStrategy(), exceptionListener);
+	public RunnableConsumerStage(final Stage stage) {
+		this(stage, new YieldStrategy());
 	}
 
-	public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy, final AbstractExceptionListener exceptionListener) {
-		super(stage, exceptionListener);
+	public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
+		super(stage);
 		this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
 	}
 
diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java
index 0e82813a..2b7b556a 100644
--- a/src/main/java/teetime/framework/RunnableProducerStage.java
+++ b/src/main/java/teetime/framework/RunnableProducerStage.java
@@ -15,14 +15,13 @@
  */
 package teetime.framework;
 
-import teetime.framework.exceptionHandling.AbstractExceptionListener;
 import teetime.framework.signal.StartingSignal;
 import teetime.framework.signal.TerminatingSignal;
 
 public final class RunnableProducerStage extends AbstractRunnableStage {
 
-	public RunnableProducerStage(final Stage stage, final AbstractExceptionListener listener) {
-		super(stage, listener);
+	public RunnableProducerStage(final Stage stage) {
+		super(stage);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index dc77a8b3..ab567e82 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import teetime.framework.exceptionHandling.AbstractExceptionListener;
 import teetime.framework.signal.ISignal;
 import teetime.framework.validation.InvalidPortConnection;
 
@@ -42,6 +43,8 @@ public abstract class Stage {
 	@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
 	protected final Logger logger;
 
+	protected AbstractExceptionListener exceptionHandler;
+
 	/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
 	protected Thread owningThread;
 
@@ -131,4 +134,7 @@ public abstract class Stage {
 	@SuppressWarnings("PMD.SignatureDeclareThrowsException")
 	public abstract void onTerminating() throws Exception;
 
+	protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
+		this.exceptionHandler = exceptionHandler;
+	}
 }
diff --git a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java
index 7d1d06a2..71499e81 100644
--- a/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java
+++ b/src/main/java/teetime/framework/exceptionHandling/IgnoringExceptionListener.java
@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
 
 import teetime.framework.Stage;
 
-public class IgnoringExceptionListener extends AbstractExceptionListener {
+class IgnoringExceptionListener extends AbstractExceptionListener {
 
 	@Override
 	public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
diff --git a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java
index 928994f0..53dce329 100644
--- a/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java
+++ b/src/main/java/teetime/framework/exceptionHandling/LoggingExceptionListener.java
@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
 
 import teetime.framework.Stage;
 
-public class LoggingExceptionListener extends AbstractExceptionListener {
+class LoggingExceptionListener extends AbstractExceptionListener {
 
 	@Override
 	public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
diff --git a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java
index 324d6430..75682648 100644
--- a/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java
+++ b/src/main/java/teetime/framework/exceptionHandling/TerminatingExceptionListener.java
@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
 
 import teetime.framework.Stage;
 
-public class TerminatingExceptionListener extends AbstractExceptionListener {
+class TerminatingExceptionListener extends AbstractExceptionListener {
 
 	@Override
 	public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
diff --git a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java
index 5965ff78..ef831990 100644
--- a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java
+++ b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java
@@ -20,7 +20,6 @@ import java.util.List;
 import teetime.framework.OldHeadPipeline;
 import teetime.framework.RunnableProducerStage;
 import teetime.framework.Stage;
-import teetime.framework.exceptionHandling.IgnoringExceptionListener;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis9 {
 
 	public void init(final IPipeFactory pipeFactory) {
 		Stage pipeline = this.buildPipeline(pipeFactory);
-		this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	/**
diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
index e1267e85..0866ada0 100644
--- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
+++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
@@ -20,7 +20,6 @@ import java.util.List;
 import teetime.framework.OldHeadPipeline;
 import teetime.framework.RunnableProducerStage;
 import teetime.framework.Stage;
-import teetime.framework.exceptionHandling.IgnoringExceptionListener;
 import teetime.framework.pipe.UnorderedGrowablePipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis11 {
 
 	public void init() {
 		Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
-		this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects,
diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
index 48e57ba2..be73f9db 100644
--- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -21,7 +21,6 @@ import teetime.framework.AnalysisConfiguration;
 import teetime.framework.OldHeadPipeline;
 import teetime.framework.RunnableProducerStage;
 import teetime.framework.Stage;
-import teetime.framework.exceptionHandling.IgnoringExceptionListener;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
@@ -65,10 +64,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
 
 	public void init() {
 		OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
-		this.clockRunnable = new RunnableProducerStage(clockPipeline, new IgnoringExceptionListener());
+		this.clockRunnable = new RunnableProducerStage(clockPipeline);
 
 		Stage pipeline = this.buildPipeline(this.clock);
-		this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() {
diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java
index 639a806e..ea66d118 100644
--- a/src/test/java/teetime/framework/StageTest.java
+++ b/src/test/java/teetime/framework/StageTest.java
@@ -15,7 +15,10 @@
  */
 package teetime.framework;
 
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -49,6 +52,8 @@ public class StageTest {
 		TestConfig tc = new TestConfig();
 		new Analysis<TestConfig>(tc);
 		assertEquals(tc.init.owningThread, tc.delay.owningThread);
+		assertThat(tc.delay.exceptionHandler, is(notNullValue()));
+		assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler);
 	}
 
 	private static class TestConfig extends AnalysisConfiguration {
diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
index 8aded34b..d6237ed4 100644
--- a/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
+++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionHandlingTest.java
@@ -18,21 +18,20 @@ package teetime.framework.exceptionHandling;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import teetime.framework.Analysis;
 
 public class ExceptionHandlingTest {
 
-	private Analysis analysis;
+	private Analysis<ExceptionTestConfiguration> analysis;
 
 	// @Before
 	public void newInstances() {
-		analysis = new Analysis(new ExceptionTestConfiguration(), new TestListenerFactory());
+		analysis = new Analysis<ExceptionTestConfiguration>(new ExceptionTestConfiguration(), new TestListenerFactory());
 	}
 
-	// @Test(timeout = 5000, expected = RuntimeException.class)
+	@Test(timeout = 5000, expected = RuntimeException.class)
 	public void exceptionPassingAndTermination() {
 		analysis.executeBlocking();
 		assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread to early
@@ -49,7 +48,6 @@ public class ExceptionHandlingTest {
 	 * SpScPipe.add and cycle through the sleep method. As a result, the thread will never return to the point
 	 * where it checks if it should be terminated.
 	 */
-	@Ignore
 	@Test(timeout = 30000)
 	public void forAFewTimes() {
 		for (int i = 0; i < 1000; i++) {
-- 
GitLab