From 1410a2de64a7a33deb697117a5d79add98c83c98 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 12 Jul 2015 14:26:41 +0200
Subject: [PATCH] added SignalingCounter

---
 .../framework/AbstractRunnableStage.java      | 62 ++++++++-----------
 .../framework/ConfigurationContext.java       |  7 +++
 .../java/teetime/framework/Execution.java     | 20 +++---
 .../java/teetime/framework/ThreadService.java | 18 ++++++
 .../concurrent/SignalingCounter.java          | 45 ++++++++++++++
 5 files changed, 105 insertions(+), 47 deletions(-)
 create mode 100644 src/main/java/teetime/framework/ThreadService.java
 create mode 100644 src/main/java/teetime/util/framework/concurrent/SignalingCounter.java

diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java
index cb9eaa61..70890e3b 100644
--- a/src/main/java/teetime/framework/AbstractRunnableStage.java
+++ b/src/main/java/teetime/framework/AbstractRunnableStage.java
@@ -32,56 +32,44 @@ abstract class AbstractRunnableStage implements Runnable {
 		this.stage = stage;
 		this.logger = LoggerFactory.getLogger(stage.getClass());
 
-		// stage.owningContext.getThreadCounter().inc();
+		if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
+			stage.owningContext.getRunnableCounter().inc();
+		}
 	}
 
 	@Override
 	public final void run() {
 		this.logger.debug("Executing runnable stage...");
-		// StageException failedException = null;
+
 		try {
-			beforeStageExecution();
 			try {
-				do {
-					executeStage();
-				} while (!stage.shouldBeTerminated());
-			} catch (StageException e) {
-				this.stage.terminate();
-				// failedException = e;
+				beforeStageExecution();
+				try {
+					do {
+						executeStage();
+					} while (!stage.shouldBeTerminated());
+				} catch (StageException e) {
+					this.stage.terminate();
+					throw e;
+				} finally {
+					afterStageExecution();
+				}
+
+			} catch (RuntimeException e) {
+				this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
 				throw e;
-			} finally {
-				afterStageExecution();
+			} catch (InterruptedException e) {
+				this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
+			}
+		} finally {
+			if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
+				stage.owningContext.getRunnableCounter().dec();
 			}
-
-		} catch (RuntimeException e) {
-			this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
-			throw e;
-		} catch (InterruptedException e) {
-			this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
 		}
 
-		this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
-		// if (failedException != null) {
-		// sendTerminatingSignal();
-		// // throw new IllegalStateException("Terminated by StageExceptionListener", failedException);
-		// throw failedException;
-		// }
-
-		// normal and exceptional termination
-		// stage.owningContext.getThreadCounter().dec();
+		logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
 	}
 
-	//
-	// private void sendTerminatingSignal() {
-	// if (stage.getTerminationStrategy() == TerminationStrategy.BY_SIGNAL) {
-	// TerminatingSignal signal = new TerminatingSignal();
-	// // TODO: Check if this is really needed... it seems like signals are passed on after their first arrival
-	// for (InputPort<?> inputPort : stage.getInputPorts()) {
-	// stage.onSignal(signal, inputPort);
-	// }
-	// }
-	// }
-
 	protected abstract void beforeStageExecution() throws InterruptedException;
 
 	protected abstract void executeStage();
diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java
index a4668f08..529164db 100644
--- a/src/main/java/teetime/framework/ConfigurationContext.java
+++ b/src/main/java/teetime/framework/ConfigurationContext.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import teetime.framework.pipe.InstantiationPipe;
+import teetime.util.framework.concurrent.SignalingCounter;
 
 /**
  * Represents a context that is used by a configuration and composite stages to connect ports, for example.
@@ -37,12 +38,18 @@ final class ConfigurationContext {
 
 	private Map<Stage, String> threadableStages = new HashMap<Stage, String>();
 
+	private final SignalingCounter runnableCounter = new SignalingCounter();
+
 	ConfigurationContext() {}
 
 	Map<Stage, String> getThreadableStages() {
 		return this.threadableStages;
 	}
 
+	SignalingCounter getRunnableCounter() {
+		return runnableCounter;
+	}
+
 	/**
 	 * @see AbstractCompositeStage#addThreadableStage(Stage)
 	 */
diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java
index d6971d22..a0eac59b 100644
--- a/src/main/java/teetime/framework/Execution.java
+++ b/src/main/java/teetime/framework/Execution.java
@@ -226,17 +226,17 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
 	 */
 	public void waitForTermination() {
 		try {
-			// stage.owningContext.getThreadCounter().await(0);
+			getConfiguration().getContext().getRunnableCounter().waitFor(0);
 
-			LOGGER.debug("Waiting for finiteProducerThreads");
-			for (Thread thread : this.finiteProducerThreads) {
-				thread.join();
-			}
-
-			LOGGER.debug("Waiting for consumerThreads");
-			for (Thread thread : this.consumerThreads) {
-				thread.join();
-			}
+			// LOGGER.debug("Waiting for finiteProducerThreads");
+			// for (Thread thread : this.finiteProducerThreads) {
+			// thread.join();
+			// }
+			//
+			// LOGGER.debug("Waiting for consumerThreads");
+			// for (Thread thread : this.consumerThreads) {
+			// thread.join();
+			// }
 		} catch (InterruptedException e) {
 			LOGGER.error("Execution has stopped unexpectedly", e);
 			for (Thread thread : this.finiteProducerThreads) {
diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java
new file mode 100644
index 00000000..a778d108
--- /dev/null
+++ b/src/main/java/teetime/framework/ThreadService.java
@@ -0,0 +1,18 @@
+package teetime.framework;
+
+public class ThreadService {
+
+	public Runnable startWithinNewThread(final Stage stage) {
+		Runnable runnable = wrap(stage);
+		Thread thread = new Thread(runnable);
+		thread.start();
+		return runnable;
+	}
+
+	private AbstractRunnableStage wrap(final Stage stage) {
+		if (stage.getInputPorts().size() > 0) {
+			return new RunnableConsumerStage(stage);
+		}
+		return new RunnableProducerStage(stage);
+	}
+}
diff --git a/src/main/java/teetime/util/framework/concurrent/SignalingCounter.java b/src/main/java/teetime/util/framework/concurrent/SignalingCounter.java
new file mode 100644
index 00000000..a71235d0
--- /dev/null
+++ b/src/main/java/teetime/util/framework/concurrent/SignalingCounter.java
@@ -0,0 +1,45 @@
+package teetime.util.framework.concurrent;
+
+import com.carrotsearch.hppc.IntObjectHashMap;
+import com.carrotsearch.hppc.IntObjectMap;
+
+public class SignalingCounter {
+
+	private final IntObjectMap<Object> conditions = new IntObjectHashMap<Object>();
+	private int counter;
+
+	// synchronized methods synchronize the map and the counter
+	// synchronized(cond) synchronizes the individual numbers for which are being waited for
+
+	public synchronized void inc() {
+		counter++;
+		conditionalNotifyAll(counter);
+	}
+
+	public synchronized void dec() {
+		counter--;
+		conditionalNotifyAll(counter);
+	}
+
+	private synchronized void conditionalNotifyAll(final int number) {
+		if (conditions.containsKey(number)) {
+			Object cond = conditions.get(number);
+			synchronized (cond) {
+				cond.notifyAll();
+			}
+		}
+	}
+
+	public synchronized void waitFor(final int number) throws InterruptedException {
+		if (!conditions.containsKey(number)) {
+			conditions.put(number, new Object());
+		}
+
+		final Object cond = conditions.get(number);
+		synchronized (cond) {
+			while (counter != number) {
+				cond.wait();
+			}
+		}
+	}
+}
-- 
GitLab