Skip to content
Snippets Groups Projects
Commit 92d53a75 authored by Christian Wulf's avatar Christian Wulf
Browse files

fix NPE

parent 9e48d25a
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.StageException;
import teetime.framework.signal.TerminatingSignal;
abstract class AbstractRunnableStage implements Runnable {
......@@ -39,7 +38,7 @@ abstract class AbstractRunnableStage implements Runnable {
@Override
public final void run() {
this.logger.debug("Executing runnable stage...");
boolean failed = false;
// StageException failedException = null;
try {
beforeStageExecution();
try {
......@@ -48,9 +47,11 @@ abstract class AbstractRunnableStage implements Runnable {
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
this.stage.terminate();
failed = true;
// failedException = e;
throw e;
} finally {
afterStageExecution();
}
afterStageExecution();
} catch (RuntimeException e) {
this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
......@@ -60,24 +61,26 @@ abstract class AbstractRunnableStage implements Runnable {
}
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
if (failed) {
sendTerminatingSignal();
throw new IllegalStateException("Terminated by StageExceptionListener");
}
// if (failedException != null) {
// sendTerminatingSignal();
// // throw new IllegalStateException("Terminated by StageExceptionListener", failedException);
// throw failedException;
// }
// normal and exceptional termination
// stage.owningContext.getThreadCounter().dec();
}
private void sendTerminatingSignal() {
if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) {
TerminatingSignal signal = new TerminatingSignal();
// TODO: Check if this is really needed... it seems like signals are passed on after their first arrival
for (InputPort<?> inputPort : stage.getInputPorts()) {
stage.onSignal(signal, inputPort);
}
}
}
//
// private void sendTerminatingSignal() {
// if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) {
// TerminatingSignal signal = new TerminatingSignal();
// // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival
// for (InputPort<?> inputPort : stage.getInputPorts()) {
// stage.onSignal(signal, inputPort);
// }
// }
// }
protected abstract void beforeStageExecution() throws InterruptedException;
......
......@@ -32,12 +32,12 @@ public class ExecutionException extends RuntimeException {
private final Collection<ThreadThrowableContainer> exceptions;
public ExecutionException(final Collection<ThreadThrowableContainer> exceptions) {
super("Error(s) while execution. Check thrown exception(s).");
super((exceptions.size() == 1) ? exceptions.toString() : "Error(s) while execution. Check thrown exception(s).");
this.exceptions = exceptions;
}
/**
* Returns all exceptions thrown within the analysis.
* Returns all exceptions thrown within the execution.
* These are passed on as pairs of threads and throwables, to indicate a exception's context.
*
* @return a collection of pairs
......
......@@ -19,7 +19,7 @@ import teetime.framework.Stage;
/**
* Represents an Exception, which is thrown by stages in case of theyimport teetime.framework.Stage;
original exception, which was thrown, call {@link #getCause()}. {@link #getThrowingStage()} returns the stage, which has thrown the original exception.
* original exception, which was thrown, call {@link #getCause()}. {@link #getThrowingStage()} returns the stage, which has thrown the original exception.
*
* @since 1.1
*/
......@@ -46,4 +46,9 @@ public class StageException extends RuntimeException {
return throwingStage;
}
@Override
public String toString() {
return getCause() + " in " + throwingStage.getId();
}
}
......@@ -31,6 +31,11 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> {
@Override
public void execute(final DynamicDistributor<T> dynamicDistributor) {
dynamicDistributor.removeDynamicPort(outputPort);
DynamicOutputPort<T> realOutputPort = outputPort;
if (outputPort instanceof PortContainer) { // BETTER replace test-specific code and abstract appropriately
realOutputPort = ((PortContainer<T>) outputPort).getPort();
}
dynamicDistributor.removeDynamicPort(realOutputPort);
}
}
......@@ -78,7 +78,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy {
public void onPortRemoved(final OutputPort<?> removedOutputPort) {
Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage();
// correct the index if it is out-of-bounds
this.index = this.index % distributor.getOutputPorts().size();
List<OutputPort<?>> outputPorts = distributor.getOutputPorts();
this.index = this.index % outputPorts.size();
}
}
......@@ -17,12 +17,12 @@ package teetime.util;
public final class ThreadThrowableContainer {
private final Thread first;
private final Throwable second;
private final Thread thread;
private final Throwable throwable;
public ThreadThrowableContainer(final Thread first, final Throwable second) {
this.first = first;
this.second = second;
this.thread = first;
this.throwable = second;
}
public static ThreadThrowableContainer of(final Thread first, final Throwable second) {
......@@ -30,16 +30,16 @@ public final class ThreadThrowableContainer {
}
public Thread getThread() {
return this.first;
return this.thread;
}
public Throwable getThrowable() {
return this.second;
return this.throwable;
}
@Override
public String toString() {
return second.getClass().getName() + " in " + getThread() + ": " + second.getLocalizedMessage();
return throwable + " in " + thread;
}
}
......@@ -30,6 +30,9 @@ public class PortList<T extends AbstractPort<?>> {
public boolean remove(final T port) {
boolean removed = openedPorts.remove(port); // BETTER remove by index for performance reasons
firePortRemoved(port);
if (!removed) {
throw new IllegalStateException();
}
return removed;
}
......
......@@ -26,4 +26,8 @@ class PortContainer<T> extends DynamicOutputPort<T> {
this.port = port;
}
public DynamicOutputPort<T> getPort() {
return port;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment