diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 9ac7e9dfb07d623994b9b3997e6eccf72652d13a..4f634f276c9ec6266aa731c7205c496d8e0a7494 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -34,7 +34,7 @@ public abstract class AbstractStage extends Stage { private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>(); private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); - private StageState currentState = StageState.CREATED; + private volatile StageState currentState = StageState.CREATED; @Override protected List<InputPort<?>> getInputPorts() { diff --git a/src/main/java/teetime/framework/exceptionHandling/StageException.java b/src/main/java/teetime/framework/exceptionHandling/StageException.java index 524db186a33435e5f94809981cb39fef57346691..7468e439b4fa5229b3c1ece5499dab9d907f6fce 100644 --- a/src/main/java/teetime/framework/exceptionHandling/StageException.java +++ b/src/main/java/teetime/framework/exceptionHandling/StageException.java @@ -18,7 +18,7 @@ package teetime.framework.exceptionHandling; import teetime.framework.Stage; /** - * Represents an Exception, which is thrown by stages in case of theyimport teetime.framework.Stage; + * Represents an Exception, which is thrown by stages in case of they import teetime.framework.Stage; * original exception, which was thrown, call {@link #getCause()}. {@link #getThrowingStage()} returns the stage, which has thrown the original exception. * * @since 1.1 diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index eb58b9e068777a71bd1c8cd490c338662117ea86..2580bdc18c1f1b55ec08fcac6227c430752c8e4d 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -39,7 +39,8 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe public boolean add(final Object element) { while (!this.queue.offer(element)) { // Thread.yield(); - if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) { + if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED && + this.getSourcePort().getOwningStage().getCurrentState() == StageState.TERMINATING) { return false; } this.numWaits++;