diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 75485f6a0b678fa1018697dc8598de4d91dbe643..f77315e5a05a1cd2a5734ea783402dac0820b953 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.framework.exceptionHandling.DefaultListener; +import teetime.framework.exceptionHandling.StageExceptionListener; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; @@ -26,6 +28,8 @@ public class Analysis implements UncaughtExceptionHandler { private final AnalysisConfiguration configuration; + private final StageExceptionListener listener; + private final List<Thread> consumerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); @@ -39,11 +43,20 @@ public class Analysis implements UncaughtExceptionHandler { * to be used for the analysis */ public Analysis(final AnalysisConfiguration configuration) { - this(configuration, false); + this(configuration, false, new DefaultListener()); } public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) { + this(configuration, validationEnabled, new DefaultListener()); + } + + public Analysis(final AnalysisConfiguration configuration, final StageExceptionListener listener) { + this(configuration, false, listener); + } + + public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled, final StageExceptionListener listener) { this.configuration = configuration; + this.listener = listener; if (validationEnabled) { validateStages(); } @@ -70,13 +83,22 @@ public class Analysis implements UncaughtExceptionHandler { public void init() { final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { + StageExceptionListener newListener; + try { + newListener = listener.getClass().newInstance(); + } catch (InstantiationException e) { + throw new IllegalStateException(e); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } switch (stage.getTerminationStrategy()) { case BY_SIGNAL: { - RunnableConsumerStage runnable; + RunnableConsumerStage runnable = null; + newListener.setHeadStage(runnable); if (stage instanceof AbstractConsumerStage<?>) { - runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around + runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy(), newListener); // FIXME remove this word-around } else { - runnable = new RunnableConsumerStage(stage); + runnable = new RunnableConsumerStage(stage, newListener); } final Thread thread = new Thread(runnable); stage.setOwningThread(thread); @@ -84,13 +106,19 @@ public class Analysis implements UncaughtExceptionHandler { break; } case BY_SELF_DECISION: { - final Thread thread = new Thread(new RunnableProducerStage(stage)); + RunnableProducerStage runnable = null; + newListener.setHeadStage(runnable); + runnable = new RunnableProducerStage(stage, newListener); + final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.finiteProducerThreads.add(thread); break; } case BY_INTERRUPT: { - final Thread thread = new Thread(new RunnableProducerStage(stage)); + RunnableProducerStage runnable = null; + newListener.setHeadStage(runnable); + runnable = new RunnableProducerStage(stage, newListener); + final Thread thread = new Thread(runnable); stage.setOwningThread(thread); this.infiniteProducerThreads.add(thread); break; diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index a68b31024fd013d3e3c4e2fdf613a6fe794c4f83..3412e93289aa64d49942a22e7922b247766283c1 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -2,6 +2,7 @@ package teetime.framework; import java.util.Arrays; +import teetime.framework.exceptionHandling.StageExceptionListener; import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.YieldStrategy; import teetime.framework.pipe.IPipe; @@ -13,16 +14,16 @@ final class RunnableConsumerStage extends RunnableStage { /** * Creates a new instance with the {@link YieldStrategy} as default idle strategy. - * + * * @param stage * to execute within an own thread */ - public RunnableConsumerStage(final Stage stage) { - this(stage, new YieldStrategy()); + public RunnableConsumerStage(final Stage stage, final StageExceptionListener exceptionListener) { + this(stage, new YieldStrategy(), exceptionListener); } - public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { - super(stage); + public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy, final StageExceptionListener exceptionListener) { + super(stage, exceptionListener); this.idleStrategy = idleStrategy; } diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index a2941b066f0fd794781a07e99d5f6f6820366bc9..96411a74a38a84de3529585c54f057d6a8e39d93 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -1,12 +1,13 @@ package teetime.framework; +import teetime.framework.exceptionHandling.StageExceptionListener; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; public final class RunnableProducerStage extends RunnableStage { - public RunnableProducerStage(final Stage stage) { - super(stage); + public RunnableProducerStage(final Stage stage, final StageExceptionListener listener) { + super(stage, listener); } @Override diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index ed98055c298e878ebf21cf7613be5dd23292b09a..44fe489aca48848f78aed30530752e271513959a 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -3,17 +3,28 @@ package teetime.framework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.framework.exceptionHandling.DefaultListener; import teetime.framework.exceptionHandling.StageException; +import teetime.framework.exceptionHandling.StageExceptionListener; -abstract class RunnableStage implements Runnable { +public abstract class RunnableStage implements Runnable { protected final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; + private final StageExceptionListener listener; public RunnableStage(final Stage stage) { this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); + this.listener = new DefaultListener(); + listener.setHeadStage(this); + } + + public RunnableStage(final Stage stage, final StageExceptionListener exceptionListener) { + this.stage = stage; + this.logger = LoggerFactory.getLogger(stage.getClass()); + this.listener = exceptionListener; } @Override @@ -27,7 +38,7 @@ abstract class RunnableStage implements Runnable { try { executeStage(); } catch (StageException e) { - // TODO: handle exception + this.listener.onStageException(e, e.getThrowingStage()); } } while (!this.stage.shouldBeTerminated()); @@ -44,6 +55,11 @@ abstract class RunnableStage implements Runnable { this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); } + public final void abortExecution() { + this.stage.terminate(); + // TODO: flag error and throw exception + } + protected abstract void beforeStageExecution(); protected abstract void executeStage(); diff --git a/src/main/java/teetime/framework/exceptionHandling/DefaultListener.java b/src/main/java/teetime/framework/exceptionHandling/DefaultListener.java new file mode 100644 index 0000000000000000000000000000000000000000..26c2cfd2e22a5bc66b67d6fc394a3243148efaa0 --- /dev/null +++ b/src/main/java/teetime/framework/exceptionHandling/DefaultListener.java @@ -0,0 +1,16 @@ +package teetime.framework.exceptionHandling; + +import teetime.framework.Stage; + +public class DefaultListener extends StageExceptionListener { + + public DefaultListener() { + super(); + // TODO Auto-generated constructor stub + } + + @Override + public void onStageException(final Exception e, final Stage throwingStage) { + // TODO Auto-generated method stub + } +} diff --git a/src/main/java/teetime/framework/exceptionHandling/StageExceptionListener.java b/src/main/java/teetime/framework/exceptionHandling/StageExceptionListener.java index f5ebc5d345dc850a0070b1518c32c4241e14b3b3..5efc432f56767c70fe83ce5c2b7f538a7c884c7b 100644 --- a/src/main/java/teetime/framework/exceptionHandling/StageExceptionListener.java +++ b/src/main/java/teetime/framework/exceptionHandling/StageExceptionListener.java @@ -1,17 +1,26 @@ package teetime.framework.exceptionHandling; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import teetime.framework.RunnableStage; import teetime.framework.Stage; /** * Represent a minimalistic StageExceptionListener. Listener which extend from this one, must a least implement this functionality. - * + * This abstract class provides a Logger {@link #logger} and a method to terminate the threads execution {@link #terminateExecution()}. */ public abstract class StageExceptionListener { - private final Thread thread; + private RunnableStage runnable; + + /** + * The default logger, which can be used by all subclasses + */ + protected final Logger logger; - public StageExceptionListener(final Thread thread) { - this.thread = thread; + public StageExceptionListener() { + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName()); } /** @@ -25,12 +34,13 @@ public abstract class StageExceptionListener { public abstract void onStageException(Exception e, Stage throwingStage); /** - * Retrieves the thread in which the exception occurred. - * - * @return exception throwing thread + * This method can be used to terminate the execution of the thread. */ - public Thread getThread() { - return thread; + protected final void terminateExecution() { + this.runnable.abortExecution(); } + public final void setHeadStage(final RunnableStage headStage) { + this.runnable = headStage; + } } diff --git a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java index 718b3295c00e56e6c2caa89dff663e2c0425a62e..dd3388babe592f333fa65c006d53d0dd8673ec73 100644 --- a/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java +++ b/src/performancetest/java/teetime/examples/experiment09pipeimpls/MethodCallThroughputAnalysis9.java @@ -20,6 +20,7 @@ import java.util.List; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; +import teetime.framework.exceptionHandling.DefaultListener; import teetime.framework.pipe.IPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis9 { public void init(final IPipeFactory pipeFactory) { Stage pipeline = this.buildPipeline(pipeFactory); - this.runnable = new RunnableProducerStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline, new DefaultListener()); } /** diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java index 22618b7f33853f89f9e6db544bd58c24168a2c6c..80b9e282794d796653b49a8d5e75c28c8a823053 100644 --- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -17,9 +17,10 @@ package teetime.examples.experiment11; import java.util.List; -import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; +import teetime.framework.Stage; +import teetime.framework.exceptionHandling.DefaultListener; import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis11 { public void init() { Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); - this.runnable = new RunnableProducerStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline, new DefaultListener()); } private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects, diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index fcde43930f195403b1e9f759733e706de6857dd4..71033ec2c5e51e5ecfa53ff97adcfc8b3237ccf9 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -21,6 +21,7 @@ import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableProducerStage; import teetime.framework.Stage; +import teetime.framework.exceptionHandling.DefaultListener; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -64,10 +65,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { public void init() { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); - this.clockRunnable = new RunnableProducerStage(clockPipeline); + this.clockRunnable = new RunnableProducerStage(clockPipeline, new DefaultListener()); Stage pipeline = this.buildPipeline(this.clock); - this.runnable = new RunnableProducerStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline, new DefaultListener()); } private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() {