From 3645dd8064727e2a7679a2cab07c759e8adaf123 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sat, 14 Jun 2014 23:55:46 +0200
Subject: [PATCH] added end signal strategy with minimal performance loss

---
 .../list/CommittableResizableArrayQueue.java  | 14 +--
 .../throughput/methodcall/AbstractStage.java  | 89 +++++++++++++++----
 .../throughput/methodcall/CollectorSink.java  |  2 +-
 .../throughput/methodcall/ConsumerStage.java  | 18 ++++
 .../MethodCallThroughputAnalysis2.java        |  2 +-
 .../throughput/methodcall/NoopFilter.java     |  2 +-
 .../throughput/methodcall/ObjectProducer.java |  3 +-
 .../methodcall/OnDisableListener.java         |  6 ++
 .../throughput/methodcall/Pipeline.java       | 84 ++++++++++++++++-
 .../throughput/methodcall/ProducerStage.java  | 18 ++++
 .../methodcall/SchedulingInformation.java     | 14 +++
 .../examples/throughput/methodcall/Stage.java |  8 ++
 .../methodcall/StartTimestampFilter.java      |  2 +-
 .../methodcall/StopTimestampFilter.java       |  2 +-
 14 files changed, 229 insertions(+), 35 deletions(-)
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/OnDisableListener.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/SchedulingInformation.java

diff --git a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
index a13d43b7..14fab9a8 100644
--- a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
+++ b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
@@ -21,24 +21,24 @@ public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
 	}
 
 	@Override
-	public T get(final int index) {
+	public final T get(final int index) {
 		T element = this.elements[index + 1];
 		return element;
 	}
 
 	@Override
 	public void addToTailUncommitted(final T element) {
-		if (this.lastFreeIndexUncommitted == this.capacity()) {
-			this.grow();
-		}
+		// if (this.lastFreeIndexUncommitted == this.capacity()) { // TODO uncomment
+		// this.grow();
+		// }
 		this.put(this.lastFreeIndexUncommitted++, element);
 	}
 
 	@Override
 	public T removeFromHeadUncommitted() {
-		if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) {
-			this.shrink();
-		}
+		// if (this.capacity() > this.MIN_CAPACITY && this.lastFreeIndexUncommitted < this.capacity() / 2) { // TODO uncomment
+		// this.shrink();
+		// }
 		T element = this.get(--this.lastFreeIndexUncommitted);
 		return element;
 	}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
index 85bb41dc..83a53368 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
@@ -3,12 +3,20 @@ package teetime.examples.throughput.methodcall;
 import teetime.util.list.CommittableQueue;
 import teetime.util.list.CommittableResizableArrayQueue;
 
-public abstract class AbstractStage<I, O> implements Stage<I, O> {
+abstract class AbstractStage<I, O> implements Stage<I, O> {
 
 	// private final InputPort<I> inputPort = new InputPort<I>();
 	// private final OutputPort<O> outputPort = new OutputPort<O>();
 
-	CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
+	protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
+
+	private final SchedulingInformation schedulingInformation = new SchedulingInformation();
+
+	private Stage parentStage;
+
+	private OnDisableListener listener;
+
+	private int index;
 
 	// @Override
 	// public InputPort<I> getInputPort() {
@@ -21,24 +29,35 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
 	// }
 
 	@Override
-	public final CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
-
+	public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
 		// pass through the end signal
 		// InputPort<I> port = this.getInputPort();
-		if (elements != null) {
-			// I element = port.read();
-			I element = elements.getTail();
-			if (element == END_SIGNAL) {
-				this.send((O) END_SIGNAL);
-			} else {
-				// elements = this.getInputPort().pipe.getElements();
-				this.execute4(elements);
-			}
-		} else {
-			throw new IllegalStateException();
-		}
+		// if (elements != null) {
+		// // I element = port.read();
+		// // I element = elements.getTail();
+		// // if (element == END_SIGNAL) {
+		// // this.send((O) END_SIGNAL);
+		// // } else {
+		// // // elements = this.getInputPort().pipe.getElements();
+		// // }
+		//
+		// this.execute4(elements);
+		// } else {
+		// throw new IllegalStateException();
+		// }
+
+		// boolean inputIsEmpty = elements.isEmpty();
+
+		this.execute4(elements);
 
 		this.outputElements.commit();
+
+		// boolean outputIsEmpty = this.outputElements.isEmpty();
+		//
+		// if (inputIsEmpty && outputIsEmpty) {
+		// this.disable();
+		// }
+
 		return this.outputElements;
 	}
 
@@ -46,7 +65,43 @@ public abstract class AbstractStage<I, O> implements Stage<I, O> {
 
 	protected abstract void execute4(CommittableQueue<I> elements);
 
-	void send(final O element) {
+	protected final void send(final O element) {
 		this.outputElements.addToTailUncommitted(element);
 	}
+
+	@Override
+	public SchedulingInformation getSchedulingInformation() {
+		return this.schedulingInformation;
+	}
+
+	public void disable() {
+		this.schedulingInformation.setActive(false);
+		this.fireOnDisable();
+	}
+
+	private void fireOnDisable() {
+		if (this.listener != null) {
+			this.listener.onDisable(this, this.index);
+		}
+	}
+
+	@Override
+	public Stage getParentStage() {
+		return this.parentStage;
+	}
+
+	@Override
+	public void setParentStage(final Stage parentStage, final int index) {
+		this.index = index;
+		this.parentStage = parentStage;
+	}
+
+	public OnDisableListener getListener() {
+		return this.listener;
+	}
+
+	public void setListener(final OnDisableListener listener) {
+		this.listener = listener;
+	}
+
 }
diff --git a/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java b/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java
index e225181d..f8195ad7 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/CollectorSink.java
@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
  * 
  * @since 1.10
  */
-public class CollectorSink<T> extends AbstractStage<T, Void> {
+public class CollectorSink<T> extends ConsumerStage<T, Void> {
 
 	private static final int THRESHOLD = 10000;
 
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
new file mode 100644
index 00000000..0948612d
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
@@ -0,0 +1,18 @@
+package teetime.examples.throughput.methodcall;
+
+import teetime.util.list.CommittableQueue;
+
+public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
+
+	@Override
+	public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
+		boolean inputIsEmpty = elements.isEmpty();
+		if (inputIsEmpty) {
+			this.disable();
+			return this.outputElements;
+		}
+
+		return super.execute2(elements);
+	}
+
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
index 0c34dc3f..218db1bc 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
@@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
 				CommittableQueue<Void> inputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
 				CommittableQueue<Void> outputQueue = new CommittableResizableArrayQueue<Void>(null, 0);
 
-				while (!(outputQueue.getTail() == Stage.END_SIGNAL)) {
+				while (pipeline.getSchedulingInformation().isActive()) {
 					outputQueue = pipeline.execute2(inputQueue);
 				}
 			}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java b/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java
index 0012b0bb..896de27d 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/NoopFilter.java
@@ -22,7 +22,7 @@ import teetime.util.list.CommittableQueue;
  * 
  * @since 1.10
  */
-public class NoopFilter<T> extends AbstractStage<T, T> {
+public class NoopFilter<T> extends ConsumerStage<T, T> {
 
 	public T execute(final T obj) {
 		return obj;
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
index 6782445f..96424548 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
@@ -24,7 +24,7 @@ import teetime.util.list.CommittableQueue;
  * 
  * @since 1.10
  */
-public class ObjectProducer<T> extends AbstractStage<Void, T> {
+public class ObjectProducer<T> extends ProducerStage<Void, T> {
 
 	private long numInputObjects;
 	private Callable<T> inputObjectCreator;
@@ -88,7 +88,6 @@ public class ObjectProducer<T> extends AbstractStage<Void, T> {
 	@Override
 	protected void execute4(final CommittableQueue<Void> elements) {
 		if (this.numInputObjects == 0) {
-			this.send((T) END_SIGNAL);
 			return;
 		}
 
diff --git a/src/test/java/teetime/examples/throughput/methodcall/OnDisableListener.java b/src/test/java/teetime/examples/throughput/methodcall/OnDisableListener.java
new file mode 100644
index 00000000..483733ea
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/OnDisableListener.java
@@ -0,0 +1,6 @@
+package teetime.examples.throughput.methodcall;
+
+public interface OnDisableListener {
+
+	void onDisable(Stage stage, int index);
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
index 1ecc4fde..d44a9160 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
@@ -6,12 +6,20 @@ import java.util.List;
 
 import teetime.util.list.CommittableQueue;
 
-public class Pipeline<I, O> implements Stage<I, O> {
+public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
 
 	private Stage firstStage;
 	private final List<Stage> intermediateStages = new LinkedList<Stage>();
 	private Stage lastStage;
 
+	private final SchedulingInformation schedulingInformation = new SchedulingInformation();
+
+	private Stage[] stages;
+	private Stage parentStage;
+	private int index;
+	private int startIndex;
+	private OnDisableListener listener;
+
 	void setFirstStage(final Stage<I, ?> stage) {
 		this.firstStage = stage;
 	}
@@ -30,11 +38,19 @@ public class Pipeline<I, O> implements Stage<I, O> {
 
 	@Override
 	public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
-		CommittableQueue queue = this.firstStage.execute2(elements);
-		for (Stage<?, ?> stage : this.intermediateStages) {
+		// CommittableQueue queue = this.firstStage.execute2(elements);
+		// for (Stage<?, ?> stage : this.intermediateStages) {
+		// queue = stage.execute2(queue);
+		// }
+		// return this.lastStage.execute2(queue);
+
+		// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
+		CommittableQueue queue = elements;
+		for (int i = this.startIndex; i < this.stages.length; i++) {
+			Stage<?, ?> stage = this.stages[i];
 			queue = stage.execute2(queue);
 		}
-		return this.lastStage.execute2(queue);
+		return queue;
 	}
 
 	void onStart() {
@@ -58,13 +74,73 @@ public class Pipeline<I, O> implements Stage<I, O> {
 		// pipe = new Pipe();
 		// this.intermediateStages.get(this.intermediateStages.size() - 1).getOutputPort().pipe = pipe;
 		// this.lastStage.getInputPort().pipe = pipe;
+
+		int size = 1 + this.intermediateStages.size() + 1;
+		this.stages = new Stage[size];
+		this.stages[0] = this.firstStage;
+		for (int i = 0; i < this.intermediateStages.size(); i++) {
+			Stage<?, ?> stage = this.intermediateStages.get(i);
+			this.stages[1 + i] = stage;
+		}
+		this.stages[this.stages.length - 1] = this.lastStage;
+
+		for (int i = 0; i < this.stages.length; i++) {
+			Stage<?, ?> stage = this.stages[i];
+			stage.setParentStage(this, i);
+			stage.setListener(this);
+		}
 	}
+
 	//
 	// @Override
 	// public InputPort getInputPort() {
 	// return this.firstStage.getInputPort();
 	// }
 
+	@Override
+	public SchedulingInformation getSchedulingInformation() {
+		return this.schedulingInformation;
+	}
+
+	@Override
+	public Stage getParentStage() {
+		return this.parentStage;
+	}
+
+	@Override
+	public void setParentStage(final Stage parentStage, final int index) {
+		this.index = index;
+		this.parentStage = parentStage;
+	}
+
+	@Override
+	public void onDisable(final Stage stage, final int index) {
+		this.startIndex = index + 1;
+		if (this.startIndex == this.stages.length) {
+			this.disable();
+		}
+	}
+
+	public void disable() {
+		this.schedulingInformation.setActive(false);
+		this.fireOnDisable();
+	}
+
+	private void fireOnDisable() {
+		if (this.listener != null) {
+			this.listener.onDisable(this, this.index);
+		}
+	}
+
+	public OnDisableListener getListener() {
+		return this.listener;
+	}
+
+	@Override
+	public void setListener(final OnDisableListener listener) {
+		this.listener = listener;
+	}
+
 	// @Override
 	// public OutputPort getOutputPort() {
 	// return this.lastStage.getOutputPort();
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
new file mode 100644
index 00000000..be0f81e4
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
@@ -0,0 +1,18 @@
+package teetime.examples.throughput.methodcall;
+
+import teetime.util.list.CommittableQueue;
+
+public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
+
+	@Override
+	public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
+		CommittableQueue<O> outputElements = super.execute2(elements);
+
+		boolean outputIsEmpty = outputElements.isEmpty();
+		if (outputIsEmpty) {
+			this.disable();
+		}
+
+		return outputElements;
+	}
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/SchedulingInformation.java b/src/test/java/teetime/examples/throughput/methodcall/SchedulingInformation.java
new file mode 100644
index 00000000..0286f43b
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/SchedulingInformation.java
@@ -0,0 +1,14 @@
+package teetime.examples.throughput.methodcall;
+
+public class SchedulingInformation {
+
+	private boolean active = true;
+
+	public boolean isActive() {
+		return this.active;
+	}
+
+	public void setActive(final boolean active) {
+		this.active = active;
+	}
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java
index e437606b..35f78cd3 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java
@@ -12,5 +12,13 @@ public interface Stage<I, O> {
 
 	CommittableQueue<O> execute2(CommittableQueue<I> elements);
 
+	SchedulingInformation getSchedulingInformation();
+
 	// OutputPort<O> getOutputPort();
+
+	Stage getParentStage();
+
+	void setParentStage(Stage parentStage, int index);
+
+	void setListener(OnDisableListener listener);
 }
diff --git a/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java b/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java
index 7de7da5f..9e818304 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/StartTimestampFilter.java
@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
  * 
  * @since 1.10
  */
-public class StartTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> {
+public class StartTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
 
 	public TimestampObject execute(final TimestampObject obj) {
 		obj.setStartTimestamp(System.nanoTime());
diff --git a/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java b/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java
index 92fe8211..ee5b3baa 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/StopTimestampFilter.java
@@ -23,7 +23,7 @@ import teetime.util.list.CommittableQueue;
  * 
  * @since 1.10
  */
-public class StopTimestampFilter extends AbstractStage<TimestampObject, TimestampObject> {
+public class StopTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> {
 
 	public TimestampObject execute(final TimestampObject obj) {
 		obj.setStopTimestamp(System.nanoTime());
-- 
GitLab