Skip to content
Snippets Groups Projects
Commit e1a6d312 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

first (more or less) complete prototype

parent 1f6379a3
Branches
Tags
No related merge requests found
Showing with 104 additions and 29 deletions
...@@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; ...@@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.DefaultListener;
import teetime.framework.exceptionHandling.StageExceptionListener;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair; import teetime.util.Pair;
...@@ -26,6 +28,8 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -26,6 +28,8 @@ public class Analysis implements UncaughtExceptionHandler {
private final AnalysisConfiguration configuration; private final AnalysisConfiguration configuration;
private final StageExceptionListener listener;
private final List<Thread> consumerThreads = new LinkedList<Thread>(); private final List<Thread> consumerThreads = new LinkedList<Thread>();
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>(); private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>(); private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
...@@ -39,11 +43,20 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -39,11 +43,20 @@ public class Analysis implements UncaughtExceptionHandler {
* to be used for the analysis * to be used for the analysis
*/ */
public Analysis(final AnalysisConfiguration configuration) { public Analysis(final AnalysisConfiguration configuration) {
this(configuration, false); this(configuration, false, new DefaultListener());
} }
public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) { public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) {
this(configuration, validationEnabled, new DefaultListener());
}
public Analysis(final AnalysisConfiguration configuration, final StageExceptionListener listener) {
this(configuration, false, listener);
}
public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled, final StageExceptionListener listener) {
this.configuration = configuration; this.configuration = configuration;
this.listener = listener;
if (validationEnabled) { if (validationEnabled) {
validateStages(); validateStages();
} }
...@@ -70,13 +83,22 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -70,13 +83,22 @@ public class Analysis implements UncaughtExceptionHandler {
public void init() { public void init() {
final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
StageExceptionListener newListener;
try {
newListener = listener.getClass().newInstance();
} catch (InstantiationException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
switch (stage.getTerminationStrategy()) { switch (stage.getTerminationStrategy()) {
case BY_SIGNAL: { case BY_SIGNAL: {
RunnableConsumerStage runnable; RunnableConsumerStage runnable = null;
newListener.setHeadStage(runnable);
if (stage instanceof AbstractConsumerStage<?>) { if (stage instanceof AbstractConsumerStage<?>) {
runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy(), newListener); // FIXME remove this word-around
} else { } else {
runnable = new RunnableConsumerStage(stage); runnable = new RunnableConsumerStage(stage, newListener);
} }
final Thread thread = new Thread(runnable); final Thread thread = new Thread(runnable);
stage.setOwningThread(thread); stage.setOwningThread(thread);
...@@ -84,13 +106,19 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -84,13 +106,19 @@ public class Analysis implements UncaughtExceptionHandler {
break; break;
} }
case BY_SELF_DECISION: { case BY_SELF_DECISION: {
final Thread thread = new Thread(new RunnableProducerStage(stage)); RunnableProducerStage runnable = null;
newListener.setHeadStage(runnable);
runnable = new RunnableProducerStage(stage, newListener);
final Thread thread = new Thread(runnable);
stage.setOwningThread(thread); stage.setOwningThread(thread);
this.finiteProducerThreads.add(thread); this.finiteProducerThreads.add(thread);
break; break;
} }
case BY_INTERRUPT: { case BY_INTERRUPT: {
final Thread thread = new Thread(new RunnableProducerStage(stage)); RunnableProducerStage runnable = null;
newListener.setHeadStage(runnable);
runnable = new RunnableProducerStage(stage, newListener);
final Thread thread = new Thread(runnable);
stage.setOwningThread(thread); stage.setOwningThread(thread);
this.infiniteProducerThreads.add(thread); this.infiniteProducerThreads.add(thread);
break; break;
......
...@@ -2,6 +2,7 @@ package teetime.framework; ...@@ -2,6 +2,7 @@ package teetime.framework;
import java.util.Arrays; import java.util.Arrays;
import teetime.framework.exceptionHandling.StageExceptionListener;
import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy; import teetime.framework.idle.YieldStrategy;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
...@@ -17,12 +18,12 @@ final class RunnableConsumerStage extends RunnableStage { ...@@ -17,12 +18,12 @@ final class RunnableConsumerStage extends RunnableStage {
* @param stage * @param stage
* to execute within an own thread * to execute within an own thread
*/ */
public RunnableConsumerStage(final Stage stage) { public RunnableConsumerStage(final Stage stage, final StageExceptionListener exceptionListener) {
this(stage, new YieldStrategy()); this(stage, new YieldStrategy(), exceptionListener);
} }
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy, final StageExceptionListener exceptionListener) {
super(stage); super(stage, exceptionListener);
this.idleStrategy = idleStrategy; this.idleStrategy = idleStrategy;
} }
......
package teetime.framework; package teetime.framework;
import teetime.framework.exceptionHandling.StageExceptionListener;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends RunnableStage { public final class RunnableProducerStage extends RunnableStage {
public RunnableProducerStage(final Stage stage) { public RunnableProducerStage(final Stage stage, final StageExceptionListener listener) {
super(stage); super(stage, listener);
} }
@Override @Override
......
...@@ -3,17 +3,28 @@ package teetime.framework; ...@@ -3,17 +3,28 @@ package teetime.framework;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.DefaultListener;
import teetime.framework.exceptionHandling.StageException; import teetime.framework.exceptionHandling.StageException;
import teetime.framework.exceptionHandling.StageExceptionListener;
abstract class RunnableStage implements Runnable { public abstract class RunnableStage implements Runnable {
protected final Stage stage; protected final Stage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal") @SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger; protected final Logger logger;
private final StageExceptionListener listener;
public RunnableStage(final Stage stage) { public RunnableStage(final Stage stage) {
this.stage = stage; this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass()); this.logger = LoggerFactory.getLogger(stage.getClass());
this.listener = new DefaultListener();
listener.setHeadStage(this);
}
public RunnableStage(final Stage stage, final StageExceptionListener exceptionListener) {
this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass());
this.listener = exceptionListener;
} }
@Override @Override
...@@ -27,7 +38,7 @@ abstract class RunnableStage implements Runnable { ...@@ -27,7 +38,7 @@ abstract class RunnableStage implements Runnable {
try { try {
executeStage(); executeStage();
} catch (StageException e) { } catch (StageException e) {
// TODO: handle exception this.listener.onStageException(e, e.getThrowingStage());
} }
} while (!this.stage.shouldBeTerminated()); } while (!this.stage.shouldBeTerminated());
...@@ -44,6 +55,11 @@ abstract class RunnableStage implements Runnable { ...@@ -44,6 +55,11 @@ abstract class RunnableStage implements Runnable {
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
} }
public final void abortExecution() {
this.stage.terminate();
// TODO: flag error and throw exception
}
protected abstract void beforeStageExecution(); protected abstract void beforeStageExecution();
protected abstract void executeStage(); protected abstract void executeStage();
......
package teetime.framework.exceptionHandling;
import teetime.framework.Stage;
public class DefaultListener extends StageExceptionListener {
public DefaultListener() {
super();
// TODO Auto-generated constructor stub
}
@Override
public void onStageException(final Exception e, final Stage throwingStage) {
// TODO Auto-generated method stub
}
}
package teetime.framework.exceptionHandling; package teetime.framework.exceptionHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.RunnableStage;
import teetime.framework.Stage; import teetime.framework.Stage;
/** /**
* Represent a minimalistic StageExceptionListener. Listener which extend from this one, must a least implement this functionality. * Represent a minimalistic StageExceptionListener. Listener which extend from this one, must a least implement this functionality.
* * This abstract class provides a Logger {@link #logger} and a method to terminate the threads execution {@link #terminateExecution()}.
*/ */
public abstract class StageExceptionListener { public abstract class StageExceptionListener {
private final Thread thread; private RunnableStage runnable;
public StageExceptionListener(final Thread thread) { /**
this.thread = thread; * The default logger, which can be used by all subclasses
*/
protected final Logger logger;
public StageExceptionListener() {
this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName());
} }
/** /**
...@@ -25,12 +34,13 @@ public abstract class StageExceptionListener { ...@@ -25,12 +34,13 @@ public abstract class StageExceptionListener {
public abstract void onStageException(Exception e, Stage throwingStage); public abstract void onStageException(Exception e, Stage throwingStage);
/** /**
* Retrieves the thread in which the exception occurred. * This method can be used to terminate the execution of the thread.
*
* @return exception throwing thread
*/ */
public Thread getThread() { protected final void terminateExecution() {
return thread; this.runnable.abortExecution();
} }
public final void setHeadStage(final RunnableStage headStage) {
this.runnable = headStage;
}
} }
...@@ -20,6 +20,7 @@ import java.util.List; ...@@ -20,6 +20,7 @@ import java.util.List;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage; import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.DefaultListener;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter; import teetime.stage.NoopFilter;
...@@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis9 { ...@@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis9 {
public void init(final IPipeFactory pipeFactory) { public void init(final IPipeFactory pipeFactory) {
Stage pipeline = this.buildPipeline(pipeFactory); Stage pipeline = this.buildPipeline(pipeFactory);
this.runnable = new RunnableProducerStage(pipeline); this.runnable = new RunnableProducerStage(pipeline, new DefaultListener());
} }
/** /**
......
...@@ -17,9 +17,10 @@ package teetime.examples.experiment11; ...@@ -17,9 +17,10 @@ package teetime.examples.experiment11;
import java.util.List; import java.util.List;
import teetime.framework.Stage;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage; import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage;
import teetime.framework.exceptionHandling.DefaultListener;
import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.framework.pipe.UnorderedGrowablePipe;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter; import teetime.stage.NoopFilter;
...@@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis11 { ...@@ -44,7 +45,7 @@ public class MethodCallThroughputAnalysis11 {
public void init() { public void init() {
Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableProducerStage(pipeline); this.runnable = new RunnableProducerStage(pipeline, new DefaultListener());
} }
private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects, private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects,
......
...@@ -21,6 +21,7 @@ import teetime.framework.AnalysisConfiguration; ...@@ -21,6 +21,7 @@ import teetime.framework.AnalysisConfiguration;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableProducerStage; import teetime.framework.RunnableProducerStage;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.exceptionHandling.DefaultListener;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
...@@ -64,10 +65,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { ...@@ -64,10 +65,10 @@ public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration {
public void init() { public void init() {
OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableProducerStage(clockPipeline); this.clockRunnable = new RunnableProducerStage(clockPipeline, new DefaultListener());
Stage pipeline = this.buildPipeline(this.clock); Stage pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableProducerStage(pipeline); this.runnable = new RunnableProducerStage(pipeline, new DefaultListener());
} }
private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() { private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment