diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index c0b77b24341d3d7f40404fbfa37fbcf3f77cccd1..4d0218d23051bb115fbfb0dff4f097f72d098857 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -271,6 +271,7 @@ public abstract class AbstractStage extends Stage { @Override protected void terminate() { changeState(StageState.TERMINATING); + owningThread.interrupt(); } @Override diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 2580bdc18c1f1b55ec08fcac6227c430752c8e4d..2937fb6d2895ddbcbe007de70b84f80e520b2339 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -19,6 +19,7 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.StageState; +import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe { @@ -39,16 +40,15 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe public boolean add(final Object element) { while (!this.queue.offer(element)) { // Thread.yield(); - if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED && - this.getSourcePort().getOwningStage().getCurrentState() == StageState.TERMINATING) { - return false; + if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED || + Thread.currentThread().isInterrupted()) { + throw TerminateException.INSTANCE; } this.numWaits++; try { Thread.sleep(1); } catch (InterruptedException e) { - // FIXME Handle it correctly - e.printStackTrace(); + throw TerminateException.INSTANCE; } } // this.reportNewElement();