From 7b88940f5e4ff8451ec18ae6b4c05290025d7c24 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 17 Dec 2014 13:38:31 +0100
Subject: [PATCH] added new runnable stage concept

---
 .settings/edu.umd.cs.findbugs.core.prefs      |  2 +-
 src/main/java/teetime/framework/Analysis.java | 38 ++++++++++---
 .../framework/NotEnoughInputException.java    | 13 +++++
 .../framework/RunnableConsumerStage.java      | 19 +++++++
 .../framework/RunnableProducerStage.java      | 24 +++++++++
 .../java/teetime/framework/RunnableStage.java | 53 +++++++------------
 src/main/java/teetime/stage/Relay.java        |  1 +
 .../MethodCallThroughputAnalysis9.java        |  4 +-
 .../MethodCallThroughputAnalysis10.java       |  4 +-
 .../MethodCallThroughputAnalysis11.java       |  4 +-
 .../MethodCallThroughputAnalysis14.java       |  4 +-
 .../MethodCallThroughputAnalysis15.java       |  6 +--
 .../MethodCallThroughputAnalysis16.java       |  6 +--
 .../MethodCallThroughputAnalysis17.java       |  8 +--
 .../MethodCallThroughputAnalysis19.java       |  6 +--
 15 files changed, 131 insertions(+), 61 deletions(-)
 create mode 100644 src/main/java/teetime/framework/NotEnoughInputException.java
 create mode 100644 src/main/java/teetime/framework/RunnableConsumerStage.java
 create mode 100644 src/main/java/teetime/framework/RunnableProducerStage.java

diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs
index 32039c99..5ce4d272 100644
--- a/.settings/edu.umd.cs.findbugs.core.prefs
+++ b/.settings/edu.umd.cs.findbugs.core.prefs
@@ -1,5 +1,5 @@
 #FindBugs User Preferences
-#Wed Dec 17 07:37:54 CET 2014
+#Wed Dec 17 09:02:35 CET 2014
 detector_threshold=3
 effort=max
 excludefilter0=.fbExcludeFilterFile|true
diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java
index 778cbae5..7d80f166 100644
--- a/src/main/java/teetime/framework/Analysis.java
+++ b/src/main/java/teetime/framework/Analysis.java
@@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import teetime.framework.signal.ValidatingSignal;
+import teetime.framework.validation.AnalysisNotValidException;
 import teetime.util.Pair;
 
 /**
@@ -30,16 +32,35 @@ public class Analysis implements UncaughtExceptionHandler {
 
 	private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>();
 
+	/**
+	 * Creates a new {@link Analysis} that initially validates the port connections.
+	 *
+	 * @param configuration
+	 *            to be used for the analysis
+	 */
 	public Analysis(final AnalysisConfiguration configuration) {
+		this(configuration, true);
+	}
+
+	public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) {
 		this.configuration = configuration;
-		validateStages();
+		if (validationEnabled) {
+			validateStages();
+		}
 	}
 
 	private void validateStages() {
 		// BETTER validate concurrently
 		final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
 		for (Stage stage : threadableStageJobs) {
-			// portConnectionValidator.validate(stage);
+			// // portConnectionValidator.validate(stage);
+			// }
+
+			final ValidatingSignal validatingSignal = new ValidatingSignal();
+			stage.onSignal(validatingSignal, null);
+			if (validatingSignal.getInvalidPortConnections().size() > 0) {
+				throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections());
+			}
 		}
 	}
 
@@ -49,17 +70,22 @@ public class Analysis implements UncaughtExceptionHandler {
 	public void init() {
 		final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
 		for (Stage stage : threadableStageJobs) {
-			final Thread thread = new Thread(new RunnableStage(stage));
 			switch (stage.getTerminationStrategy()) {
-			case BY_SIGNAL:
+			case BY_SIGNAL: {
+				final Thread thread = new Thread(new RunnableConsumerStage(stage));
 				this.consumerThreads.add(thread);
 				break;
-			case BY_SELF_DECISION:
+			}
+			case BY_SELF_DECISION: {
+				final Thread thread = new Thread(new RunnableProducerStage(stage));
 				this.finiteProducerThreads.add(thread);
 				break;
-			case BY_INTERRUPT:
+			}
+			case BY_INTERRUPT: {
+				final Thread thread = new Thread(new RunnableProducerStage(stage));
 				this.infiniteProducerThreads.add(thread);
 				break;
+			}
 			default:
 				break;
 			}
diff --git a/src/main/java/teetime/framework/NotEnoughInputException.java b/src/main/java/teetime/framework/NotEnoughInputException.java
new file mode 100644
index 00000000..a26f482b
--- /dev/null
+++ b/src/main/java/teetime/framework/NotEnoughInputException.java
@@ -0,0 +1,13 @@
+package teetime.framework;
+
+final class NotEnoughInputException extends RuntimeException {
+
+	private static final long serialVersionUID = -2517233596919204396L;
+
+	@SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
+	@Override
+	public synchronized Throwable fillInStackTrace() {
+		return this;
+	}
+
+}
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
new file mode 100644
index 00000000..3b111a94
--- /dev/null
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -0,0 +1,19 @@
+package teetime.framework;
+
+public final class RunnableConsumerStage extends RunnableStage {
+
+	public RunnableConsumerStage(final Stage stage) {
+		super(stage);
+	}
+
+	@Override
+	protected void beforeStageExecution() {
+		// TODO wait for starting signal
+	}
+
+	@Override
+	protected void afterStageExecution() {
+		// do nothing
+	}
+
+}
diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java
new file mode 100644
index 00000000..35aeb24e
--- /dev/null
+++ b/src/main/java/teetime/framework/RunnableProducerStage.java
@@ -0,0 +1,24 @@
+package teetime.framework;
+
+import teetime.framework.signal.StartingSignal;
+import teetime.framework.signal.TerminatingSignal;
+
+public final class RunnableProducerStage extends RunnableStage {
+
+	public RunnableProducerStage(final Stage stage) {
+		super(stage);
+	}
+
+	@Override
+	protected void beforeStageExecution() {
+		final StartingSignal startingSignal = new StartingSignal();
+		this.stage.onSignal(startingSignal, null);
+	}
+
+	@Override
+	protected void afterStageExecution() {
+		final TerminatingSignal terminatingSignal = new TerminatingSignal();
+		this.stage.onSignal(terminatingSignal, null);
+	}
+
+}
diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java
index 3c1ccbe3..7f92379d 100644
--- a/src/main/java/teetime/framework/RunnableStage.java
+++ b/src/main/java/teetime/framework/RunnableStage.java
@@ -3,17 +3,11 @@ package teetime.framework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import teetime.framework.signal.StartingSignal;
-import teetime.framework.signal.TerminatingSignal;
-import teetime.framework.signal.ValidatingSignal;
-import teetime.framework.validation.AnalysisNotValidException;
+abstract class RunnableStage implements Runnable {
 
-public class RunnableStage implements Runnable {
-
-	private final Stage stage;
+	protected final Stage stage;
 	@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
-	private final Logger logger;
-	private boolean validationEnabled;
+	protected final Logger logger;
 
 	public RunnableStage(final Stage stage) {
 		this.stage = stage;
@@ -21,44 +15,37 @@ public class RunnableStage implements Runnable {
 	}
 
 	@Override
-	public void run() {
+	public final void run() {
 		this.logger.debug("Executing runnable stage...");
 
-		if (this.validationEnabled) {
-			final ValidatingSignal validatingSignal = new ValidatingSignal();
-			this.stage.onSignal(validatingSignal, null);
-			if (validatingSignal.getInvalidPortConnections().size() > 0) {
-				throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections());
-			}
-		}
-
 		try {
-			final StartingSignal startingSignal = new StartingSignal();
-			this.stage.onSignal(startingSignal, null);
+			beforeStageExecution();
 
 			do {
-				this.stage.executeWithPorts();
+				try {
+					this.stage.executeWithPorts();
+				} catch (NotEnoughInputException e) {
+					// 1. check for terminating signal
+					// new Thread().getState() == State.WAITING
+
+					// 2. check for no input reaction: this.getStrategy()
+					// 2.1 if BUSY_WAITING with timeout to then sleep(to)
+					// 2.2 if BLOCKING_WAIT then
+
+				}
 			} while (!this.stage.shouldBeTerminated());
 
-			final TerminatingSignal terminatingSignal = new TerminatingSignal();
-			this.stage.onSignal(terminatingSignal, null);
+			afterStageExecution();
 
 		} catch (Error e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
 			throw e;
-		} // catch (RuntimeException e) {
-			// this.logger.error("Terminating thread due to the following exception: ", e);
-			// throw e;
-			// }
+		}
 
 		this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
 	}
 
-	public boolean isValidationEnabled() {
-		return this.validationEnabled;
-	}
+	protected abstract void beforeStageExecution();
 
-	public void setValidationEnabled(final boolean validationEnabled) {
-		this.validationEnabled = validationEnabled;
-	}
+	protected abstract void afterStageExecution();
 }
diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java
index 96e30334..4c1c19a2 100644
--- a/src/main/java/teetime/stage/Relay.java
+++ b/src/main/java/teetime/stage/Relay.java
@@ -20,6 +20,7 @@ public final class Relay<T> extends AbstractProducerStage<T> {
 			}
 			Thread.yield();
 			return;
+			// returnNoElement();
 		}
 		outputPort.send(element);
 	}
diff --git a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
index e5e38cab..9f96c90e 100644
--- a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
+++ b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java
@@ -19,7 +19,7 @@ import java.util.List;
 
 import teetime.framework.Stage;
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.CommittablePipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -44,7 +44,7 @@ public class MethodCallThroughputAnalysis9 {
 
 	public void init() {
 		Stage pipeline = this.buildPipeline();
-		this.runnable = new RunnableStage(pipeline);
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	/**
diff --git a/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java
index 6fac5159..70b476ff 100644
--- a/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java
+++ b/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java
@@ -18,7 +18,7 @@ package teetime.examples.experiment10;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -73,7 +73,7 @@ public class MethodCallThroughputAnalysis10 {
 		SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
 		SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
 
-		return new RunnableStage(pipeline);
+		return new RunnableProducerStage(pipeline);
 	}
 
 	public void start() {
diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
index ac70216d..22618b7f 100644
--- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
+++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java
@@ -19,7 +19,7 @@ import java.util.List;
 
 import teetime.framework.Stage;
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.UnorderedGrowablePipe;
 import teetime.stage.CollectorSink;
 import teetime.stage.NoopFilter;
@@ -44,7 +44,7 @@ public class MethodCallThroughputAnalysis11 {
 
 	public void init() {
 		Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
-		this.runnable = new RunnableStage(pipeline);
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects,
diff --git a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
index 658ce7a0..2f871761 100644
--- a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
+++ b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java
@@ -19,7 +19,7 @@ import java.util.List;
 
 import teetime.framework.Stage;
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.IPipeFactory;
 import teetime.framework.pipe.PipeFactoryRegistry;
 import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
@@ -48,7 +48,7 @@ public class MethodCallThroughputAnalysis14 {
 
 	public void init() {
 		Stage pipeline = this.buildPipeline();
-		this.runnable = new RunnableStage(pipeline);
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	/**
diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
index f6c65eed..1dda610e 100644
--- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
+++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java
@@ -19,7 +19,7 @@ import java.util.List;
 
 import teetime.framework.Stage;
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
@@ -56,10 +56,10 @@ public class MethodCallThroughputAnalysis15 {
 	public void init() {
 
 		OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
-		this.clockRunnable = new RunnableStage(clockPipeline);
+		this.clockRunnable = new RunnableProducerStage(clockPipeline);
 
 		Stage pipeline = this.buildPipeline(this.clock);
-		this.runnable = new RunnableStage(pipeline);
+		this.runnable = new RunnableProducerStage(pipeline);
 	}
 
 	private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() {
diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
index b48c2378..167c3f97 100644
--- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
+++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java
@@ -20,7 +20,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.SingleElementPipe;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.CollectorSink;
@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis16 {
 	public void init() {
 		OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
-		this.producerThread = new Thread(new RunnableStage(producerPipeline));
+		this.producerThread = new Thread(new RunnableProducerStage(producerPipeline));
 
 		this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
 
@@ -68,7 +68,7 @@ public class MethodCallThroughputAnalysis16 {
 			this.timestampObjectsList.add(resultList);
 
 			OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
-			this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline));
+			this.workerThreads[i] = new Thread(new RunnableProducerStage(workerPipeline));
 		}
 	}
 
diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java
index 60baf45d..1dc83abb 100644
--- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java
+++ b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java
@@ -20,7 +20,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.Stage;
 import teetime.framework.pipe.DummyPipe;
 import teetime.framework.pipe.IPipe;
@@ -62,7 +62,7 @@ public class MethodCallThroughputAnalysis17 {
 	public void init() {
 		OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
-		this.producerThread = new Thread(new RunnableStage(producerPipeline));
+		this.producerThread = new Thread(new RunnableProducerStage(producerPipeline));
 
 		int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
 
@@ -72,7 +72,7 @@ public class MethodCallThroughputAnalysis17 {
 			this.timestampObjectsList.add(resultList);
 
 			OldHeadPipeline<?, ?> pipeline = this.buildPipeline(null, resultList);
-			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
+			this.workerThreads[i] = new Thread(new RunnableProducerStage(pipeline));
 		}
 
 		// this.producerThread = new Thread(new Runnable() {
@@ -96,7 +96,7 @@ public class MethodCallThroughputAnalysis17 {
 
 		// this.producerThread.start();
 		// this.producerThread.run();
-		new RunnableStage(producerPipeline).run();
+		new RunnableProducerStage(producerPipeline).run();
 
 		// try {
 		// this.producerThread.join();
diff --git a/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java
index f38312f5..8a0d11df 100644
--- a/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java
+++ b/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java
@@ -20,7 +20,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import teetime.framework.OldHeadPipeline;
-import teetime.framework.RunnableStage;
+import teetime.framework.RunnableProducerStage;
 import teetime.framework.pipe.OrderedGrowableArrayPipe;
 import teetime.framework.pipe.SpScPipe;
 import teetime.stage.CollectorSink;
@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis19 {
 	public void init() {
 		OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
 				this.inputObjectCreator);
-		this.producerThread = new Thread(new RunnableStage(producerPipeline));
+		this.producerThread = new Thread(new RunnableProducerStage(producerPipeline));
 
 		this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
 
@@ -68,7 +68,7 @@ public class MethodCallThroughputAnalysis19 {
 			this.timestampObjectsList.add(resultList);
 
 			OldHeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList);
-			this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
+			this.workerThreads[i] = new Thread(new RunnableProducerStage(pipeline));
 		}
 
 	}
-- 
GitLab