Skip to content
Snippets Groups Projects
Commit d2d2474c authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

moved exception handling from "thread head" to "throwing stage";

modified scope of Listener implementations (should only be instantiated
by factories)
parent f0ba7a6c
No related branches found
No related tags found
No related merge requests found
Showing
with 53 additions and 46 deletions
......@@ -15,6 +15,7 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
public abstract class AbstractConsumerStage<I> extends AbstractStage {
......@@ -35,9 +36,12 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage {
try {
this.execute(element);
} catch (Exception e) {
final FurtherExecution furtherExecution = exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
protected abstract void execute(I element);
......
......@@ -15,6 +15,7 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
/**
......@@ -39,9 +40,12 @@ public abstract class AbstractProducerStage<O> extends AbstractStage {
try {
this.execute();
} catch (Exception e) {
final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw new StageException(e, this);
}
}
}
@Override
public TerminationStrategy getTerminationStrategy() {
......
......@@ -18,25 +18,20 @@ package teetime.framework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.StageException;
import teetime.framework.signal.TerminatingSignal;
abstract class AbstractRunnableStage implements Runnable {
private final AbstractExceptionListener exceptionHandler;
private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: ";
private final Stage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
public AbstractRunnableStage(final Stage stage, final AbstractExceptionListener exceptionHandler) {
public AbstractRunnableStage(final Stage stage) {
this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass());
this.exceptionHandler = exceptionHandler;
}
@Override
......@@ -45,19 +40,13 @@ abstract class AbstractRunnableStage implements Runnable {
boolean failed = false;
try {
beforeStageExecution(stage);
do {
try {
do {
executeStage(stage);
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
final FurtherExecution furtherExecution = this.exceptionHandler.onStageException(e, e.getThrowingStage());
if (furtherExecution == FurtherExecution.TERMINATE) {
this.stage.terminate();
failed = true;
}
}
} while (!stage.shouldBeTerminated());
afterStageExecution(stage);
} catch (RuntimeException e) {
......
......@@ -136,30 +136,36 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
newListener = factory.create();
switch (stage.getTerminationStrategy()) {
case BY_SIGNAL: {
final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage, newListener);
final RunnableConsumerStage runnableConsumerStage = new RunnableConsumerStage(stage);
final Thread thread = new Thread(runnableConsumerStage);
stage.setExceptionHandler(newListener);
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
}
this.consumerThreads.add(thread);
thread.setName(stage.getId());
break;
}
case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener);
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
final Thread thread = new Thread(runnable);
stage.setExceptionHandler(newListener);
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
}
this.finiteProducerThreads.add(thread);
thread.setName(stage.getId());
break;
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage, newListener);
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
final Thread thread = new Thread(runnable);
stage.setExceptionHandler(newListener);
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
}
this.infiniteProducerThreads.add(thread);
thread.setName(stage.getId());
......
......@@ -15,7 +15,6 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy;
import teetime.framework.signal.ISignal;
......@@ -32,12 +31,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
* @param stage
* to execute within an own thread
*/
public RunnableConsumerStage(final Stage stage, final AbstractExceptionListener exceptionListener) {
this(stage, new YieldStrategy(), exceptionListener);
public RunnableConsumerStage(final Stage stage) {
this(stage, new YieldStrategy());
}
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy, final AbstractExceptionListener exceptionListener) {
super(stage, exceptionListener);
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
super(stage);
this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
}
......
......@@ -15,14 +15,13 @@
*/
package teetime.framework;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends AbstractRunnableStage {
public RunnableProducerStage(final Stage stage, final AbstractExceptionListener listener) {
super(stage, listener);
public RunnableProducerStage(final Stage stage) {
super(stage);
}
@Override
......
......@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
......@@ -42,6 +43,8 @@ public abstract class Stage {
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
protected AbstractExceptionListener exceptionHandler;
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
protected Thread owningThread;
......@@ -131,4 +134,7 @@ public abstract class Stage {
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onTerminating() throws Exception;
protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
}
......@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
import teetime.framework.Stage;
public class IgnoringExceptionListener extends AbstractExceptionListener {
class IgnoringExceptionListener extends AbstractExceptionListener {
@Override
public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
......
......@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
import teetime.framework.Stage;
public class LoggingExceptionListener extends AbstractExceptionListener {
class LoggingExceptionListener extends AbstractExceptionListener {
@Override
public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
......
......@@ -17,7 +17,7 @@ package teetime.framework.exceptionHandling;
import teetime.framework.Stage;
public class TerminatingExceptionListener extends AbstractExceptionListener {
class TerminatingExceptionListener extends AbstractExceptionListener {
@Override
public FurtherExecution onStageException(final Exception e, final Stage throwingStage) {
......
......@@ -20,7 +20,6 @@ import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.IgnoringExceptionListener;
import teetime.framework.pipe.IPipeFactory;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......@@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis9 {
public void init(final IPipeFactory pipeFactory) {
Stage pipeline = this.buildPipeline(pipeFactory);
this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
this.runnable = new RunnableProducerStage(pipeline);
}
/**
......
......@@ -20,7 +20,6 @@ import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.IgnoringExceptionListener;
import teetime.framework.pipe.UnorderedGrowablePipe;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......@@ -45,7 +44,7 @@ public class MethodCallThroughputAnalysis11 {
public void init() {
Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
this.runnable = new RunnableProducerStage(pipeline);
}
private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects,
......
......@@ -21,7 +21,6 @@ import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.IgnoringExceptionListener;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
......@@ -65,10 +64,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
public void init() {
OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableProducerStage(clockPipeline, new IgnoringExceptionListener());
this.clockRunnable = new RunnableProducerStage(clockPipeline);
Stage pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableProducerStage(pipeline, new IgnoringExceptionListener());
this.runnable = new RunnableProducerStage(pipeline);
}
private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() {
......
......@@ -15,7 +15,10 @@
*/
package teetime.framework;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import org.junit.Assert;
import org.junit.Test;
......@@ -49,6 +52,8 @@ public class StageTest {
TestConfig tc = new TestConfig();
new Analysis<TestConfig>(tc);
assertEquals(tc.init.owningThread, tc.delay.owningThread);
assertThat(tc.delay.exceptionHandler, is(notNullValue()));
assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler);
}
private static class TestConfig extends AnalysisConfiguration {
......
......@@ -18,21 +18,20 @@ package teetime.framework.exceptionHandling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Ignore;
import org.junit.Test;
import teetime.framework.Analysis;
public class ExceptionHandlingTest {
private Analysis analysis;
private Analysis<ExceptionTestConfiguration> analysis;
// @Before
public void newInstances() {
analysis = new Analysis(new ExceptionTestConfiguration(), new TestListenerFactory());
analysis = new Analysis<ExceptionTestConfiguration>(new ExceptionTestConfiguration(), new TestListenerFactory());
}
// @Test(timeout = 5000, expected = RuntimeException.class)
@Test(timeout = 5000, expected = RuntimeException.class)
public void exceptionPassingAndTermination() {
analysis.executeBlocking();
assertEquals(TestListener.exceptionInvoked, 2); // listener did not kill thread to early
......@@ -49,7 +48,6 @@ public class ExceptionHandlingTest {
* SpScPipe.add and cycle through the sleep method. As a result, the thread will never return to the point
* where it checks if it should be terminated.
*/
@Ignore
@Test(timeout = 30000)
public void forAFewTimes() {
for (int i = 0; i < 1000; i++) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment