From 9675d39439c1f3206b56fc4811fcf759e138f46c Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 17 Jul 2015 12:24:03 +0200
Subject: [PATCH] #207 added capacity to IPipe

---
 .../teetime/framework/AbstractInterThreadPipe.java |  4 ++--
 .../teetime/framework/AbstractIntraThreadPipe.java |  4 ++--
 src/main/java/teetime/framework/AbstractPipe.java  | 14 +++++++++++---
 .../teetime/framework/ExecutionInstantiation.java  |  4 ++--
 .../java/teetime/framework/pipe/DummyPipe.java     |  5 +++++
 .../teetime/framework/pipe/IMonitorablePipe.java   |  2 ++
 src/main/java/teetime/framework/pipe/IPipe.java    |  7 ++++++-
 .../teetime/framework/pipe/InstantiationPipe.java  |  3 ++-
 .../java/teetime/framework/pipe/RelayTestPipe.java |  2 +-
 .../teetime/framework/pipe/SingleElementPipe.java  |  2 +-
 src/main/java/teetime/framework/pipe/SpScPipe.java |  2 +-
 .../teetime/framework/pipe/UnboundedSpScPipe.java  |  2 +-
 .../stage/basic/merger/MergerTestingPipe.java      |  5 +++++
 13 files changed, 41 insertions(+), 15 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index 3336d636..8786ef85 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -38,8 +38,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 
 	private volatile boolean closed;
 
-	protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		super(sourcePort, targetPort);
+	protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort, capacity);
 		final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
 		final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
 		final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>();
diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
index 8108cd5d..a12acf71 100644
--- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java
@@ -21,8 +21,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
 
 	private boolean closed;
 
-	protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		super(sourcePort, targetPort);
+	protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		super(sourcePort, targetPort, capacity);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java
index 19df8e3a..2a6ea3bb 100644
--- a/src/main/java/teetime/framework/AbstractPipe.java
+++ b/src/main/java/teetime/framework/AbstractPipe.java
@@ -30,8 +30,10 @@ public abstract class AbstractPipe implements IPipe {
 
 	private final OutputPort<?> sourcePort;
 	private final InputPort<?> targetPort;
+	@SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName")
+	private final int capacity;
 
-	protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+	protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		if (sourcePort == null) {
 			throw new IllegalArgumentException("sourcePort may not be null");
 		}
@@ -44,16 +46,17 @@ public abstract class AbstractPipe implements IPipe {
 
 		this.sourcePort = sourcePort;
 		this.targetPort = targetPort;
+		this.capacity = capacity;
 		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
 	@Override
-	public OutputPort<?> getSourcePort() {
+	public final OutputPort<?> getSourcePort() {
 		return sourcePort;
 	}
 
 	@Override
-	public InputPort<?> getTargetPort() {
+	public final InputPort<?> getTargetPort() {
 		return targetPort;
 	}
 
@@ -61,4 +64,9 @@ public abstract class AbstractPipe implements IPipe {
 	public final boolean hasMore() {
 		return !isEmpty();
 	}
+
+	@Override
+	public final int capacity() {
+		return capacity;
+	}
 }
diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java
index cce003ef..63a57c3d 100644
--- a/src/main/java/teetime/framework/ExecutionInstantiation.java
+++ b/src/main/java/teetime/framework/ExecutionInstantiation.java
@@ -72,8 +72,8 @@ class ExecutionInstantiation {
 		}
 
 		if (threadableStages.contains(targetStage) && targetColor != color) {
-			if (pipe.getCapacity() != 0) {
-				interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity());
+			if (pipe.capacity() != 0) {
+				interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity());
 			} else {
 				interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
 			}
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index c67015f1..8381869e 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -95,4 +95,9 @@ public final class DummyPipe implements IPipe {
 
 	}
 
+	@Override
+	public int capacity() {
+		return 0;
+	}
+
 }
diff --git a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java
index 0ca5d9c3..db61ad14 100644
--- a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java
+++ b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java
@@ -23,6 +23,8 @@ public interface IMonitorablePipe {
 
 	int size();
 
+	int capacity();
+
 	long getPushThroughput();
 
 	long getPullThroughput();
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index a28a3419..de1cd908 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -50,10 +50,15 @@ public interface IPipe {
 	boolean isEmpty();
 
 	/**
-	 * @return the current number of elements
+	 * @return the current number of elements held by this pipe instance
 	 */
 	int size();
 
+	/**
+	 * @return the maximum number of elements possible to hold by this pipe instance
+	 */
+	int capacity();
+
 	/**
 	 * Retrieves the last element of the pipe and deletes it.
 	 *
diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
index de753e9e..ab02f62d 100644
--- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java
+++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
@@ -35,7 +35,8 @@ public class InstantiationPipe implements IPipe {
 		targetPort.setPipe(this);
 	}
 
-	public int getCapacity() {
+	@Override
+	public int capacity() {
 		return capacity;
 	}
 
diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
index 1922e40d..20e99adb 100644
--- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
@@ -24,7 +24,7 @@ final class RelayTestPipe<T> extends AbstractInterThreadPipe {
 	private final ConstructorClosure<T> inputObjectCreator;
 
 	public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
-		super(null, null);
+		super(null, null, Integer.MAX_VALUE);
 		this.numInputObjects = numInputObjects;
 		this.inputObjectCreator = inputObjectCreator;
 	}
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
index 108a01f8..655b9b5f 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
@@ -24,7 +24,7 @@ final class SingleElementPipe extends AbstractIntraThreadPipe {
 	private Object element;
 
 	<T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		super(sourcePort, targetPort);
+		super(sourcePort, targetPort, 1);
 	}
 
 	@Override
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 6fc9c350..eb58b9e0 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -30,7 +30,7 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
 	private int numWaits;
 
 	<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		super(sourcePort, targetPort);
+		super(sourcePort, targetPort, capacity);
 		this.queue = new ObservableSpScArrayQueue<Object>(capacity);
 	}
 
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
index 37891446..954c5918 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -31,7 +31,7 @@ final class UnboundedSpScPipe extends AbstractInterThreadPipe {
 	private final Queue<Object> queue;
 
 	<T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		super(sourcePort, targetPort);
+		super(sourcePort, targetPort, Integer.MAX_VALUE);
 		ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
 		this.queue = QueueFactory.newQueue(specification);
 	}
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index ad77ffde..62b2d97b 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -69,6 +69,11 @@ class MergerTestingPipe implements IPipe {
 		return 0;
 	}
 
+	@Override
+	public int capacity() {
+		return 0;
+	}
+
 	@Override
 	public Object removeLast() {
 		return null;
-- 
GitLab