diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index 379181a9d2bf548b0c49d5c7f299952f4165e56c..cb9eaa618dc754026bbc1e62fd9d977c6a1f7986 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -19,7 +19,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.StageException; -import teetime.framework.signal.TerminatingSignal; abstract class AbstractRunnableStage implements Runnable { @@ -39,7 +38,7 @@ abstract class AbstractRunnableStage implements Runnable { @Override public final void run() { this.logger.debug("Executing runnable stage..."); - boolean failed = false; + // StageException failedException = null; try { beforeStageExecution(); try { @@ -48,9 +47,11 @@ abstract class AbstractRunnableStage implements Runnable { } while (!stage.shouldBeTerminated()); } catch (StageException e) { this.stage.terminate(); - failed = true; + // failedException = e; + throw e; + } finally { + afterStageExecution(); } - afterStageExecution(); } catch (RuntimeException e) { this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e); @@ -60,24 +61,26 @@ abstract class AbstractRunnableStage implements Runnable { } this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); - if (failed) { - sendTerminatingSignal(); - throw new IllegalStateException("Terminated by StageExceptionListener"); - } + // if (failedException != null) { + // sendTerminatingSignal(); + // // throw new IllegalStateException("Terminated by StageExceptionListener", failedException); + // throw failedException; + // } // normal and exceptional termination // stage.owningContext.getThreadCounter().dec(); } - private void sendTerminatingSignal() { - if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { - TerminatingSignal signal = new TerminatingSignal(); - // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival - for (InputPort<?> inputPort : stage.getInputPorts()) { - stage.onSignal(signal, inputPort); - } - } - } + // + // private void sendTerminatingSignal() { + // if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) { + // TerminatingSignal signal = new TerminatingSignal(); + // // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival + // for (InputPort<?> inputPort : stage.getInputPorts()) { + // stage.onSignal(signal, inputPort); + // } + // } + // } protected abstract void beforeStageExecution() throws InterruptedException; diff --git a/src/main/java/teetime/framework/ExecutionException.java b/src/main/java/teetime/framework/ExecutionException.java index 7ad685af258b909be92a4e7a6eb86172c4ad2a4b..f33901b38239210b2aa5cf3df5563cd0cba1cc5c 100644 --- a/src/main/java/teetime/framework/ExecutionException.java +++ b/src/main/java/teetime/framework/ExecutionException.java @@ -32,12 +32,12 @@ public class ExecutionException extends RuntimeException { private final Collection<ThreadThrowableContainer> exceptions; public ExecutionException(final Collection<ThreadThrowableContainer> exceptions) { - super("Error(s) while execution. Check thrown exception(s)."); + super((exceptions.size() == 1) ? exceptions.toString() : "Error(s) while execution. Check thrown exception(s)."); this.exceptions = exceptions; } /** - * Returns all exceptions thrown within the analysis. + * Returns all exceptions thrown within the execution. * These are passed on as pairs of threads and throwables, to indicate a exception's context. * * @return a collection of pairs diff --git a/src/main/java/teetime/framework/exceptionHandling/StageException.java b/src/main/java/teetime/framework/exceptionHandling/StageException.java index 8edc750fd02c4b0b59c463131f47d710eccee242..524db186a33435e5f94809981cb39fef57346691 100644 --- a/src/main/java/teetime/framework/exceptionHandling/StageException.java +++ b/src/main/java/teetime/framework/exceptionHandling/StageException.java @@ -19,7 +19,7 @@ import teetime.framework.Stage; /** * Represents an Exception, which is thrown by stages in case of theyimport teetime.framework.Stage; - original exception, which was thrown, call {@link #getCause()}. {@link #getThrowingStage()} returns the stage, which has thrown the original exception. + * original exception, which was thrown, call {@link #getCause()}. {@link #getThrowingStage()} returns the stage, which has thrown the original exception. * * @since 1.1 */ @@ -46,4 +46,9 @@ public class StageException extends RuntimeException { return throwingStage; } + @Override + public String toString() { + return getCause() + " in " + throwingStage.getId(); + } + } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java index 461d3fdba022d1609d5fba2e52c25cb50d41ccd1..c9785f48b4b026e2619ea101b710b2a57c4d03f9 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/RemovePortAction.java @@ -31,6 +31,11 @@ public class RemovePortAction<T> implements PortAction<DynamicDistributor<T>> { @Override public void execute(final DynamicDistributor<T> dynamicDistributor) { - dynamicDistributor.removeDynamicPort(outputPort); + DynamicOutputPort<T> realOutputPort = outputPort; + if (outputPort instanceof PortContainer) { // BETTER replace test-specific code and abstract appropriately + realOutputPort = ((PortContainer<T>) outputPort).getPort(); + } + + dynamicDistributor.removeDynamicPort(realOutputPort); } } diff --git a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java index 9abe0b0af628f64d48212b6a3d138728ddbf5b0a..9dd7e4db5bdd28ae014e77bab727333f2f335c23 100644 --- a/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java +++ b/src/main/java/teetime/stage/basic/distributor/strategy/RoundRobinStrategy2.java @@ -78,7 +78,8 @@ public final class RoundRobinStrategy2 implements IDistributorStrategy { public void onPortRemoved(final OutputPort<?> removedOutputPort) { Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage(); // correct the index if it is out-of-bounds - this.index = this.index % distributor.getOutputPorts().size(); + List<OutputPort<?>> outputPorts = distributor.getOutputPorts(); + this.index = this.index % outputPorts.size(); } } diff --git a/src/main/java/teetime/util/ThreadThrowableContainer.java b/src/main/java/teetime/util/ThreadThrowableContainer.java index f4fa400142c61755c13549b008dcfc928aae23f5..df9e5fd171176afd9e23b85b46c4c85c7ebcf6c8 100644 --- a/src/main/java/teetime/util/ThreadThrowableContainer.java +++ b/src/main/java/teetime/util/ThreadThrowableContainer.java @@ -17,12 +17,12 @@ package teetime.util; public final class ThreadThrowableContainer { - private final Thread first; - private final Throwable second; + private final Thread thread; + private final Throwable throwable; public ThreadThrowableContainer(final Thread first, final Throwable second) { - this.first = first; - this.second = second; + this.thread = first; + this.throwable = second; } public static ThreadThrowableContainer of(final Thread first, final Throwable second) { @@ -30,16 +30,16 @@ public final class ThreadThrowableContainer { } public Thread getThread() { - return this.first; + return this.thread; } public Throwable getThrowable() { - return this.second; + return this.throwable; } @Override public String toString() { - return second.getClass().getName() + " in " + getThread() + ": " + second.getLocalizedMessage(); + return throwable + " in " + thread; } } diff --git a/src/main/java/teetime/util/framework/port/PortList.java b/src/main/java/teetime/util/framework/port/PortList.java index a7d14053f609ff06c8fa559fe7e54f6bfa7fc622..0c0f1fd6d0028010cc4fd0ef0dde29c24aaaa6d8 100644 --- a/src/main/java/teetime/util/framework/port/PortList.java +++ b/src/main/java/teetime/util/framework/port/PortList.java @@ -30,6 +30,9 @@ public class PortList<T extends AbstractPort<?>> { public boolean remove(final T port) { boolean removed = openedPorts.remove(port); // BETTER remove by index for performance reasons firePortRemoved(port); + if (!removed) { + throw new IllegalStateException(); + } return removed; } diff --git a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java index 8c35499536150d582e062ccb714a36aa2fb8c044..2bf43f5d8a89b94f384d9a2e6fa40cfa2ddd53f7 100644 --- a/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java +++ b/src/test/java/teetime/stage/basic/distributor/dynamic/PortContainer.java @@ -26,4 +26,8 @@ class PortContainer<T> extends DynamicOutputPort<T> { this.port = port; } + public DynamicOutputPort<T> getPort() { + return port; + } + }