From 41fff673012b195dc1a6b7773e7a75124834e44b Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de>
Date: Mon, 27 Jul 2015 16:11:18 +0200
Subject: [PATCH] better thread termination

---
 src/main/java/teetime/framework/AbstractStage.java |  1 +
 src/main/java/teetime/framework/pipe/SpScPipe.java | 10 +++++-----
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index c0b77b24..4d0218d2 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -271,6 +271,7 @@ public abstract class AbstractStage extends Stage {
 	@Override
 	protected void terminate() {
 		changeState(StageState.TERMINATING);
+		owningThread.interrupt();
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 2580bdc1..2937fb6d 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -19,6 +19,7 @@ import teetime.framework.AbstractInterThreadPipe;
 import teetime.framework.InputPort;
 import teetime.framework.OutputPort;
 import teetime.framework.StageState;
+import teetime.framework.exceptionHandling.TerminateException;
 import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
 
 final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe {
@@ -39,16 +40,15 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
 	public boolean add(final Object element) {
 		while (!this.queue.offer(element)) {
 			// Thread.yield();
-			if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED &&
-					this.getSourcePort().getOwningStage().getCurrentState() == StageState.TERMINATING) {
-				return false;
+			if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED ||
+					Thread.currentThread().isInterrupted()) {
+				throw TerminateException.INSTANCE;
 			}
 			this.numWaits++;
 			try {
 				Thread.sleep(1);
 			} catch (InterruptedException e) {
-				// FIXME Handle it correctly
-				e.printStackTrace();
+				throw TerminateException.INSTANCE;
 			}
 		}
 		// this.reportNewElement();
-- 
GitLab