Skip to content
Snippets Groups Projects
Commit dca81b1f authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

minor fixes; duplicated Analysis.start to move to a void return type

parent b76d8a09
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,10 @@ ...@@ -10,6 +10,10 @@
<action dev="ntd" type="add" issue="32"> <action dev="ntd" type="add" issue="32">
Introduced a generic exception handling for stages. Introduced a generic exception handling for stages.
</action> </action>
<action dev="ntd" type="update" issue="92">
Introduced new method (returns void) in Analysis, which starts the execution of the analysis.
Marked old method as deprecated.
</action>
</release> </release>
<release version="1.0" date="19.12.2014" description="Initial release"> <release version="1.0" date="19.12.2014" description="Initial release">
......
...@@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; ...@@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.StageException; import teetime.framework.exceptionHandling.StageException;
import teetime.framework.exceptionHandling.StageExceptionHandler; import teetime.framework.exceptionHandling.StageExceptionHandler;
import teetime.framework.exceptionHandling.StageExceptionHandler.FurtherExecution; import teetime.framework.exceptionHandling.StageExceptionHandler.FurtherExecution;
import teetime.framework.signal.TerminatingSignal;
abstract class AbstractRunnableStage implements Runnable { abstract class AbstractRunnableStage implements Runnable {
...@@ -52,6 +53,14 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -52,6 +53,14 @@ abstract class AbstractRunnableStage implements Runnable {
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
if (failed) { if (failed) {
if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) {
TerminatingSignal signal = new TerminatingSignal();
// TODO: Check if this is really needed... it seems like signals are passed on after their first arrival
InputPort<?>[] inputPorts = stage.getInputPorts();
for (int i = 0; i < inputPorts.length; i++) {
stage.onSignal(signal, inputPorts[i]);
}
}
throw new IllegalStateException("Terminated by StageExceptionListener"); throw new IllegalStateException("Terminated by StageExceptionListener");
} }
} }
......
...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; ...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.IgnoringStageListener; import teetime.framework.exceptionHandling.IgnoringStageListener;
import teetime.framework.exceptionHandling.StageExceptionHandler; import teetime.framework.exceptionHandling.StageExceptionHandler;
import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair; import teetime.util.Pair;
...@@ -140,8 +139,11 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -140,8 +139,11 @@ public class Analysis implements UncaughtExceptionHandler {
* This method will start the Analysis and all containing stages. * This method will start the Analysis and all containing stages.
* *
* @return a collection of thread/throwable pairs * @return a collection of thread/throwable pairs
*
* @deprecated As of release 1.1, replaced by {@link #execute()}
*/ */
public void start() { @Deprecated
public Collection<Pair<Thread, Throwable>> start() {
// start analysis // start analysis
startThreads(this.consumerThreads); startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads); startThreads(this.finiteProducerThreads);
...@@ -170,10 +172,18 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -170,10 +172,18 @@ public class Analysis implements UncaughtExceptionHandler {
for (Thread thread : this.infiniteProducerThreads) { for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt(); thread.interrupt();
} }
return this.exceptions;
}
/**
* This method will start the Analysis and all containing stages.
*/
public void execute() {
start();
if (!exceptions.isEmpty()) { if (!exceptions.isEmpty()) {
throw new RuntimeException("Errors while running analysis"); // TODO: add exceptions throw new RuntimeException("Errors while running analysis"); // TODO: add exceptions
} }
// return this.exceptions;
} }
private void startThreads(final Iterable<Thread> threads) { private void startThreads(final Iterable<Thread> threads) {
...@@ -199,15 +209,8 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -199,15 +209,8 @@ public class Analysis implements UncaughtExceptionHandler {
LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now.");
for (Stage stage : configuration.getThreadableStageJobs()) { for (Stage stage : configuration.getThreadableStageJobs()) {
if (stage.getOwningThread() != thread) { if (stage.getOwningThread() != thread) {
switch (stage.getTerminationStrategy()) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) {
case BY_SELF_DECISION: { stage.terminate();
stage.terminate(); // onSignal would also work, but this will execute in its own Thread
}
case BY_SIGNAL: {
final TerminatingSignal terminatingSignal = new TerminatingSignal();
stage.onSignal(terminatingSignal, null);
}
default:
} }
} }
} }
......
wiki @ a9358190
Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8 Subproject commit a93581905ef7b0584d52eae1898148ffa6201a31
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment