From c70f660d3c81860b2815289828da2835737c4a6c Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 27 Aug 2014 17:13:25 +0200
Subject: [PATCH] pipe calls the target stage from now on if it is an intra
 pipe

---
 .../framework/core/AbstractStage.java         | 10 ++++----
 .../framework/core/OutputPort.java            | 17 ++++++++-----
 .../framework/core/pipe/AbstractPipe.java     | 20 ++++++++--------
 .../framework/core/pipe/DummyPipe.java        |  5 ++++
 .../framework/core/pipe/IPipe.java            |  2 ++
 .../framework/core/pipe/IntraThreadPipe.java  |  7 +++++-
 .../framework/core/pipe/SpScPipe.java         |  5 ++++
 .../ChwWorkComparisonMethodcallWithPorts.java |  8 +++----
 .../MethodCallThroughputAnalysis14.java       | 24 ++++++++++++-------
 9 files changed, 64 insertions(+), 34 deletions(-)

diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
index 80bfd8df..ab8a4be9 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java
@@ -48,11 +48,13 @@ public abstract class AbstractStage implements StageWithPort {
 			return false;
 		}
 
-		StageWithPort next = outputPort.getCachedTargetStage();
+		outputPort.reportNewElement();
 
-		do {
-			next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
-		} while (next.isReschedulable());
+		// StageWithPort next = outputPort.getCachedTargetStage();
+		//
+		// do {
+		// next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
+		// } while (next.isReschedulable());
 
 		return true;
 	}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
index 97d66d05..3ce89029 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java
@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core;
 
 import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
 
-public class OutputPort<T> extends AbstractPort<T> {
+public final class OutputPort<T> extends AbstractPort<T> {
 
 	/**
 	 * Performance cache: Avoids the following method chain
@@ -11,7 +11,7 @@ public class OutputPort<T> extends AbstractPort<T> {
 	 * this.getPipe().getTargetPort().getOwningStage()
 	 * </pre>
 	 */
-	private StageWithPort cachedTargetStage;
+	// private StageWithPort cachedTargetStage;
 
 	OutputPort() {
 		super();
@@ -26,16 +26,21 @@ public class OutputPort<T> extends AbstractPort<T> {
 		return this.pipe.add(element);
 	}
 
-	public StageWithPort getCachedTargetStage() {
-		return this.cachedTargetStage;
-	}
+	// public StageWithPort getCachedTargetStage() {
+	// return this.cachedTargetStage;
+	// }
 
+	@Deprecated
 	public void setCachedTargetStage(final StageWithPort cachedTargetStage) {
-		this.cachedTargetStage = cachedTargetStage;
+		// this.cachedTargetStage = cachedTargetStage;
 	}
 
 	public void sendSignal(final Signal signal) {
 		this.pipe.setSignal(signal);
 	}
 
+	public void reportNewElement() {
+		this.pipe.reportNewElement();
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
index 386bf62b..e914afef 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java
@@ -1,21 +1,20 @@
 package teetime.variant.methodcallWithPorts.framework.core.pipe;
 
 import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
 
 public abstract class AbstractPipe<T> implements IPipe<T> {
 
-	// private final AtomicBoolean closed = new AtomicBoolean();
 	private InputPort<T> targetPort;
 
-	// @Override
-	// public boolean isClosed() {
-	// return this.closed.get();
-	// }
-	//
-	// @Override
-	// public void close() {
-	// this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
-	// }
+	/**
+	 * Performance cache: Avoids the following method chain
+	 *
+	 * <pre>
+	 * this.getPipe().getTargetPort().getOwningStage()
+	 * </pre>
+	 */
+	protected StageWithPort cachedTargetStage;
 
 	@Override
 	public InputPort<T> getTargetPort() {
@@ -25,6 +24,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
 	@Override
 	public void setTargetPort(final InputPort<T> targetPort) {
 		this.targetPort = targetPort;
+		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
index 498ada08..4c8c195f 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java
@@ -52,4 +52,9 @@ public final class DummyPipe implements IPipe {
 	@Override
 	public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
 
+	@Override
+	public void reportNewElement() {
+		// do nothing
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
index a5f25199..7362f6fd 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java
@@ -29,4 +29,6 @@ public interface IPipe<T> {
 	// BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...}
 	void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
 
+	void reportNewElement();
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
index 116d2903..1e78c8b4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java
@@ -7,8 +7,13 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
 	@Override
 	public void setSignal(final Signal signal) {
 		if (this.getTargetPort() != null) {
-			this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
+			this.cachedTargetStage.onSignal(signal, this.getTargetPort());
 		}
 	}
 
+	@Override
+	public final void reportNewElement() {
+		this.cachedTargetStage.executeWithPorts();
+	}
+
 }
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index 8ceb2927..22b5c6a4 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -83,4 +83,9 @@ public class SpScPipe<T> extends AbstractPipe<T> {
 		return this.signal.get();
 	}
 
+	@Override
+	public void reportNewElement() {
+		// do nothing
+	}
+
 }
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java
index 0dd3840c..e8c6d799 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/ChwWorkComparisonMethodcallWithPorts.java
@@ -89,10 +89,10 @@ public class ChwWorkComparisonMethodcallWithPorts implements PerformanceCheckPro
 		// assertEquals(53, value17, 4.1); // +0
 
 		// since 27.08.2014 (incl.)
-		assertEquals(112, value14, 5.1); // +16
-		assertEquals(42, value10, 2.1); // +16
-		assertEquals(41, value11, 4.1); // -3
-		assertEquals(42, value9, 2.1); // +6
+		assertEquals(102, value14, 5.1); // +16
+		assertEquals(56, value10, 2.1); // +30
+		assertEquals(64, value11, 4.1); // +15
+		assertEquals(77, value9, 2.1); // +35
 		assertEquals(44, value15, 4.1); // +0
 		assertEquals(53, value17, 4.1); // +0
 
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
index e62d0572..8f50cbe1 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java
@@ -23,7 +23,9 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
 import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
 import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
 import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
 import teetime.variant.methodcallWithPorts.stage.CollectorSink;
 import teetime.variant.methodcallWithPorts.stage.NoopFilter;
 import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
@@ -32,13 +34,11 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
 
 /**
  * @author Christian Wulf
- * 
+ *
  * @since 1.10
  */
 public class MethodCallThroughputAnalysis14 extends Analysis {
 
-	private static final int SPSC_INITIAL_CAPACITY = 4;
-
 	private long numInputObjects;
 	private ConstructorClosure<TimestampObject> inputObjectCreator;
 	private int numNoopFilters;
@@ -75,13 +75,19 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
 		pipeline.addIntermediateStage(stopTimestampFilter);
 		pipeline.setLastStage(collectorSink);
 
-		SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY);
-		SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort(), SPSC_INITIAL_CAPACITY);
+		PipeFactory pipeFactory = new PipeFactory();
+		IPipe<TimestampObject> pipe = pipeFactory.create(ThreadCommunication.INTRA);
+		pipe.connectPorts(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
+		pipe = pipeFactory.create(ThreadCommunication.INTRA);
+		pipe.connectPorts(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
 		for (int i = 0; i < noopFilters.length - 1; i++) {
-			SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort(), SPSC_INITIAL_CAPACITY);
+			pipe = pipeFactory.create(ThreadCommunication.INTRA);
+			pipe.connectPorts(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
 		}
-		SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY);
-		SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort(), SPSC_INITIAL_CAPACITY);
+		pipe = pipeFactory.create(ThreadCommunication.INTRA);
+		pipe.connectPorts(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		pipe = pipeFactory.create(ThreadCommunication.INTRA);
+		pipe.connectPorts(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
 
 		return pipeline;
 	}
-- 
GitLab