From c51c2e8e0d3c0efffd797d15a96852bfea0b09ec Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 19 Dec 2014 08:37:59 +0100
Subject: [PATCH] renamed logback to logback-test; added catch block for
 RuntimeException in RunnableStage; adapted expected perf results; added
 Stage.returnNoElement()

---
 .../framework/AbstractConsumerStage.java      |  8 ++++--
 .../java/teetime/framework/AbstractStage.java | 25 +++++++++++--------
 .../framework/RunnableConsumerStage.java      |  1 +
 .../java/teetime/framework/RunnableStage.java |  3 +++
 src/main/java/teetime/framework/Stage.java    |  5 ++++
 src/main/java/teetime/stage/Relay.java        |  7 ------
 src/main/java/teetime/stage/basic/Delay.java  |  2 +-
 .../teetime/stage/basic/merger/Merger.java    |  2 +-
 .../experiment09/ChwWorkPerformanceCheck.java |  4 ++-
 .../experiment14/ChwWorkPerformanceCheck.java |  4 ++-
 .../experiment16/ChwWorkPerformanceCheck.java |  6 +++--
 .../framework/RunnableConsumerStageTest.java  |  6 +++--
 .../{logback.xml => logback-test.xml}         |  4 +--
 13 files changed, 47 insertions(+), 30 deletions(-)
 rename src/test/resources/{logback.xml => logback-test.xml} (88%)

diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java
index 7346e281..1cc3bdb0 100644
--- a/src/main/java/teetime/framework/AbstractConsumerStage.java
+++ b/src/main/java/teetime/framework/AbstractConsumerStage.java
@@ -1,20 +1,24 @@
 package teetime.framework;
 
 import teetime.framework.idle.IdleStrategy;
+import teetime.framework.idle.YieldStrategy;
 
 public abstract class AbstractConsumerStage<I> extends AbstractStage {
 
 	protected final InputPort<I> inputPort = this.createInputPort();
 
-	private IdleStrategy idleStrategy; // FIXME remove this word-around
+	private IdleStrategy idleStrategy = new YieldStrategy(); // FIXME remove this word-around
 
 	public final InputPort<I> getInputPort() {
 		return this.inputPort;
 	}
 
 	@Override
-	public void executeWithPorts() {
+	public final void executeWithPorts() {
 		final I element = this.getInputPort().receive();
+		if (null == element) {
+			returnNoElement();
+		}
 
 		this.execute(element);
 	}
diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 680d1b89..6dc10ce0 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -21,19 +21,10 @@ public abstract class AbstractStage extends Stage {
 	protected OutputPort<?>[] cachedOutputPorts;
 
 	private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
+	// BETTER aggregate both states in an enum
 	private boolean shouldTerminate;
-
 	private boolean started;
 
-	private void connectUnconnectedOutputPorts() {
-		for (OutputPort<?> outputPort : this.cachedOutputPorts) {
-			if (null == outputPort.getPipe()) { // if port is unconnected
-				this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
-				outputPort.setPipe(new DummyPipe());
-			}
-		}
-	}
-
 	/**
 	 * @return the stage's input ports
 	 */
@@ -53,6 +44,7 @@ public abstract class AbstractStage extends Stage {
 	/**
 	 * May not be invoked outside of IPipe implementations
 	 */
+	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
 	@Override
 	public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
 		if (!this.signalAlreadyReceived(signal, inputPort)) {
@@ -97,7 +89,17 @@ public abstract class AbstractStage extends Stage {
 
 		this.connectUnconnectedOutputPorts();
 		started = true;
-		logger.info(this + " started.");
+		logger.debug("Started.");
+	}
+
+	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
+	private void connectUnconnectedOutputPorts() {
+		for (OutputPort<?> outputPort : this.cachedOutputPorts) {
+			if (null == outputPort.getPipe()) { // if port is unconnected
+				this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
+				outputPort.setPipe(new DummyPipe());
+			}
+		}
 	}
 
 	public void onTerminating() throws Exception {
@@ -128,6 +130,7 @@ public abstract class AbstractStage extends Stage {
 		return outputPort;
 	}
 
+	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
 	@Override
 	public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
 		// for (OutputPort<?> outputPort : this.getOutputPorts()) {
diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java
index 209f826e..a498171c 100644
--- a/src/main/java/teetime/framework/RunnableConsumerStage.java
+++ b/src/main/java/teetime/framework/RunnableConsumerStage.java
@@ -53,6 +53,7 @@ public final class RunnableConsumerStage extends RunnableStage {
 		}
 	}
 
+	@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
 	private void checkforSignals() {
 		// FIXME should getInputPorts() really be defined in Stage?
 		InputPort<?>[] inputPorts = stage.getInputPorts();
diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java
index f23e7ed8..b79b8d2d 100644
--- a/src/main/java/teetime/framework/RunnableStage.java
+++ b/src/main/java/teetime/framework/RunnableStage.java
@@ -30,6 +30,9 @@ abstract class RunnableStage implements Runnable {
 		} catch (Error e) {
 			this.logger.error("Terminating thread due to the following exception: ", e);
 			throw e;
+		} catch (RuntimeException e) {
+			this.logger.error("Terminating thread due to the following exception: ", e);
+			throw e;
 		}
 
 		this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")");
diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java
index d7f0b043..6c13ebdd 100644
--- a/src/main/java/teetime/framework/Stage.java
+++ b/src/main/java/teetime/framework/Stage.java
@@ -18,6 +18,7 @@ import teetime.framework.validation.InvalidPortConnection;
 public abstract class Stage {
 
 	private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>();
+	private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException();
 
 	private final String id;
 	/**
@@ -67,6 +68,10 @@ public abstract class Stage {
 	//
 	// public abstract void setParentStage(Stage parentStage, int index);
 
+	protected final void returnNoElement() {
+		throw NOT_ENOUGH_INPUT_EXCEPTION;
+	}
+
 	/**
 	 * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors.
 	 *
diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java
index 98349057..ae98cdaf 100644
--- a/src/main/java/teetime/stage/Relay.java
+++ b/src/main/java/teetime/stage/Relay.java
@@ -1,7 +1,6 @@
 package teetime.stage;
 
 import teetime.framework.AbstractConsumerStage;
-import teetime.framework.NotEnoughInputException;
 import teetime.framework.OutputPort;
 
 public final class Relay<T> extends AbstractConsumerStage<T> {
@@ -11,8 +10,6 @@ public final class Relay<T> extends AbstractConsumerStage<T> {
 
 	// private AbstractInterThreadPipe cachedCastedInputPipe;
 
-	private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException();
-
 	@Override
 	protected void execute(final T element) {
 		if (null == element) {
@@ -27,10 +24,6 @@ public final class Relay<T> extends AbstractConsumerStage<T> {
 		outputPort.send(element);
 	}
 
-	private void returnNoElement() {
-		throw NOT_ENOUGH_INPUT_EXCEPTION;
-	}
-
 	// @Override
 	// public void onStarting() throws Exception {
 	// super.onStarting();
diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java
index a06bf6cb..0d68e707 100644
--- a/src/main/java/teetime/stage/basic/Delay.java
+++ b/src/main/java/teetime/stage/basic/Delay.java
@@ -24,7 +24,7 @@ public final class Delay<T> extends AbstractStage {
 
 		Long timestampTrigger = this.timestampTriggerInputPort.receive();
 		if (null == timestampTrigger) {
-			return;
+			returnNoElement();
 		}
 
 		sendAllBufferedEllements();
diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java
index aa84f0e3..d03fcdab 100644
--- a/src/main/java/teetime/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/stage/basic/merger/Merger.java
@@ -58,7 +58,7 @@ public final class Merger<T> extends AbstractStage {
 	public void executeWithPorts() {
 		final T token = this.strategy.getNextInput(this);
 		if (token == null) {
-			return;
+			returnNoElement();
 		}
 
 		outputPort.send(token);
diff --git a/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java
index ba4eda09..5359eddc 100644
--- a/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java
+++ b/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java
@@ -26,6 +26,8 @@ class ChwWorkPerformanceCheck extends AbstractPerformanceCheck {
 		// since 27.08.2014 (incl.)
 		// assertEquals(77, value9, 2.1); // +35
 		// since 14.10.2014 (incl.)
-		assertEquals(67, medianSpeedup, 3.1); // -10
+		// assertEquals(67, medianSpeedup, 3.1); // -10
+		// since 19.12.2014 (incl.)
+		assertEquals(53, medianSpeedup, 3.1); // -14
 	}
 }
diff --git a/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java
index b8ce7c10..b2d4deed 100644
--- a/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java
+++ b/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java
@@ -26,6 +26,8 @@ class ChwWorkPerformanceCheck extends AbstractPerformanceCheck {
 		// since 27.08.2014 (incl.)
 		// assertEquals(102, medianSpeedup, 5.1); // +16
 		// since 14.10.2014 (incl.)
-		assertEquals(81, medianSpeedup, 5.1); // -21
+		// assertEquals(81, medianSpeedup, 5.1); // -21
+		// since 19.12.2014 (incl.)
+		assertEquals(56, medianSpeedup, 5.1); // -25
 	}
 }
diff --git a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java
index c95c13d1..797f2fef 100644
--- a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java
+++ b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java
@@ -1,9 +1,9 @@
 package teetime.examples.experiment16;
 
 import static org.junit.Assert.assertEquals;
+import util.test.AbstractProfiledPerformanceAssertion;
 import util.test.PerformanceResult;
 import util.test.PerformanceTest;
-import util.test.AbstractProfiledPerformanceAssertion;
 
 class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion {
 
@@ -23,7 +23,9 @@ class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion {
 		System.out.println("speedupC: " + speedupC);
 
 		assertEquals(2, speedupB, 0.3);
-		assertEquals(2.5, speedupC, 0.3);
+		// assertEquals(2.5, speedupC, 0.3);
+		// since 19.12.2014
+		assertEquals(2.0, speedupC, 0.3);
 	}
 
 	@Override
diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java
index ab2f75b0..02335317 100644
--- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java
+++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java
@@ -9,6 +9,8 @@ import org.junit.Test;
 
 import teetime.util.Pair;
 
+import com.google.common.base.Joiner;
+
 public class RunnableConsumerStageTest {
 
 	@Test
@@ -68,8 +70,8 @@ public class RunnableConsumerStageTest {
 	private void start(final Analysis analysis) {
 		Collection<Pair<Thread, Throwable>> exceptions = analysis.start();
 		for (Pair<Thread, Throwable> pair : exceptions) {
-			// System.out.println(pair.getSecond());
-			// System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
+			System.err.println(pair.getSecond());
+			System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
 			throw new RuntimeException(pair.getSecond());
 		}
 		assertEquals(0, exceptions.size());
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback-test.xml
similarity index 88%
rename from src/test/resources/logback.xml
rename to src/test/resources/logback-test.xml
index 8a46d9dc..5f80ba17 100644
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback-test.xml
@@ -20,8 +20,8 @@
 		</encoder>
 	</appender>
 	
-<!-- 	<logger name="teetime.framework" level="TRACE" /> -->
-<!-- 	<logger name="teetime.stage" level="TRACE" /> -->
+	<logger name="teetime.framework" level="TRACE" />
+	<logger name="teetime.stage" level="TRACE" />
 	<logger name="teetime" level="INFO" />
 	<logger name="util" level="INFO" />
 
-- 
GitLab