From 22d6aae4c8aa3c2e6e124109df7441d79030d9df Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Sun, 15 Jun 2014 03:29:00 +0200
Subject: [PATCH] added recursive version of pipeline

---
 .../throughput/methodcall/AbstractStage.java  | 24 ++++++++++------
 .../throughput/methodcall/ConsumerStage.java  |  4 ++-
 .../MethodCallThroughputAnalysis2.java        |  2 +-
 .../MethodCallThroughputAnalysis8.java        |  4 ++-
 .../throughput/methodcall/ObjectProducer.java |  2 ++
 .../throughput/methodcall/Pipeline.java       | 28 ++++++++++++++++---
 .../throughput/methodcall/ProducerStage.java  |  5 ++++
 .../examples/throughput/methodcall/Stage.java |  2 ++
 8 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
index 18f4360..acd437a 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/AbstractStage.java
@@ -20,6 +20,8 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
 
 	private Stage successor;
 
+	private boolean reschedulable;
+
 	// @Override
 	// public InputPort<I> getInputPort() {
 	// return this.inputPort;
@@ -48,18 +50,10 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
 		// throw new IllegalStateException();
 		// }
 
-		// boolean inputIsEmpty = elements.isEmpty();
-
 		this.execute4(elements);
 
 		this.outputElements.commit();
 
-		// boolean outputIsEmpty = this.outputElements.isEmpty();
-		//
-		// if (inputIsEmpty && outputIsEmpty) {
-		// this.disable();
-		// }
-
 		return this.outputElements;
 	}
 
@@ -69,6 +63,11 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
 
 	protected final void send(final O element) {
 		this.outputElements.addToTailUncommitted(element);
+
+		this.outputElements.commit();
+		do {
+			CommittableQueue execute = this.next().execute2(this.outputElements);
+		} while (this.next().isReschedulable());
 	}
 
 	@Override
@@ -117,4 +116,13 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
 		this.successor = successor;
 	}
 
+	@Override
+	public boolean isReschedulable() {
+		return this.reschedulable;
+	}
+
+	public void setReschedulable(final boolean reschedulable) {
+		this.reschedulable = reschedulable;
+	}
+
 }
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
index 0948612..51ecdc3 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/ConsumerStage.java
@@ -12,7 +12,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
 			return this.outputElements;
 		}
 
-		return super.execute2(elements);
+		CommittableQueue<O> output = super.execute2(elements);
+		this.setReschedulable(!elements.isEmpty()); // costs ~1200 ns on chw-work
+		return output;
 	}
 
 }
diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
index c1c4f42..330bda0 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis2.java
@@ -80,7 +80,7 @@ public class MethodCallThroughputAnalysis2 extends Analysis {
 
 				do {
 					outputQueue = pipeline.execute2(inputQueue);
-				} while (pipeline.getSchedulingInformation().isActive());
+				} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
 			}
 		};
 
diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java
index 916727c..7c8fecc 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis8.java
@@ -75,11 +75,13 @@ public class MethodCallThroughputAnalysis8 extends Analysis {
 		final AbstractStage[] stages = stageList.toArray(new AbstractStage[0]);
 
 		final WrappingPipeline pipeline = new WrappingPipeline() {
+			private int startIndex;
+
 			@Override
 			public boolean execute() {
 				// using the foreach for arrays (i.e., w/o using an iterator variable) increases the performance from 200ms to 130ms
 				Object element = null;
-				for (int i = 0; i < stages.length; i++) {
+				for (int i = this.startIndex; i < stages.length; i++) {
 					Stage stage = stages[i];
 					element = stage.execute(element);
 					if (element == null) {
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
index 7c4199d..db40957 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/ObjectProducer.java
@@ -40,6 +40,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
 	@Override
 	public T execute(final Object element) {
 		if (this.numInputObjects == 0) {
+			this.setReschedulable(false);
 			return null;
 		}
 
@@ -89,6 +90,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
 	@Override
 	protected void execute4(final CommittableQueue<Void> elements) {
 		if (this.numInputObjects == 0) {
+			this.setReschedulable(false);
 			return;
 		}
 
diff --git a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
index f0789f7..8be8637 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/Pipeline.java
@@ -20,6 +20,8 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
 	private int startIndex;
 	private OnDisableListener listener;
 
+	private boolean reschedulable;
+
 	void setFirstStage(final Stage<I, ?> stage) {
 		this.firstStage = stage;
 	}
@@ -46,10 +48,14 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
 
 		// below is faster than above (probably because of the instantiation of a list iterator in each (!) execution)
 		CommittableQueue queue = elements;
-		for (int i = this.startIndex; i < this.stages.length; i++) {
-			Stage<?, ?> stage = this.stages[i];
-			queue = stage.execute2(queue);
-		}
+
+		// for (int i = this.startIndex; i < this.stages.length; i++) {
+		// Stage<?, ?> stage = this.stages[i];
+		// queue = stage.execute2(queue);
+		// }
+
+		this.stages[0].execute2(elements);
+		this.setReschedulable(this.stages[0].isReschedulable());
 		return queue;
 	}
 
@@ -89,6 +95,11 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
 			stage.setParentStage(this, i);
 			stage.setListener(this);
 		}
+
+		for (int i = 0; i < this.stages.length - 1; i++) {
+			Stage<?, ?> stage = this.stages[i];
+			stage.setSuccessor(this.stages[i + 1]);
+		}
 	}
 
 	//
@@ -156,6 +167,15 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
 		throw new IllegalStateException();
 	}
 
+	@Override
+	public boolean isReschedulable() {
+		return this.reschedulable;
+	}
+
+	public void setReschedulable(final boolean reschedulable) {
+		this.reschedulable = reschedulable;
+	}
+
 	// @Override
 	// public OutputPort getOutputPort() {
 	// return this.lastStage.getOutputPort();
diff --git a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
index be0f81e..262119d 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/ProducerStage.java
@@ -4,6 +4,10 @@ import teetime.util.list.CommittableQueue;
 
 public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
 
+	public ProducerStage() {
+		this.setReschedulable(true);
+	}
+
 	@Override
 	public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
 		CommittableQueue<O> outputElements = super.execute2(elements);
@@ -15,4 +19,5 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
 
 		return outputElements;
 	}
+
 }
diff --git a/src/test/java/teetime/examples/throughput/methodcall/Stage.java b/src/test/java/teetime/examples/throughput/methodcall/Stage.java
index d9c0670..10eefc4 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/Stage.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/Stage.java
@@ -27,4 +27,6 @@ public interface Stage<I, O> {
 	Stage next();
 
 	void setSuccessor(Stage<?, ?> successor);
+
+	boolean isReschedulable();
 }
-- 
GitLab