From 7a11950235de888184b56a67d735c067058d2988 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 13 Feb 2015 11:07:02 +0100
Subject: [PATCH] added UnboundedSpScPipe

---
 .../java/teetime/framework/pipe/SpScPipe.java |  4 +-
 .../framework/pipe/UnboundedSpScPipe.java     | 77 +++++++++++++++++++
 .../pipe/UnboundedSpScPipeFactory.java        | 55 +++++++++++++
 src/main/resources/pipe-factories.conf        |  3 +-
 4 files changed, 135 insertions(+), 4 deletions(-)
 create mode 100644 src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
 create mode 100644 src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java

diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 0641f975..dc391f35 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -19,8 +19,6 @@ import java.util.Queue;
 
 import org.jctools.queues.QueueFactory;
 import org.jctools.queues.spec.ConcurrentQueueSpec;
-import org.jctools.queues.spec.Ordering;
-import org.jctools.queues.spec.Preference;
 
 import teetime.framework.AbstractInterThreadPipe;
 import teetime.framework.InputPort;
@@ -36,7 +34,7 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 
 	<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		super(sourcePort, targetPort);
-		this.queue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT));
+		this.queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(capacity));
 	}
 
 	@Deprecated
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
new file mode 100644
index 00000000..87cfff3a
--- /dev/null
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package teetime.framework.pipe;
+
+import java.util.Queue;
+
+import org.jctools.queues.QueueFactory;
+import org.jctools.queues.spec.ConcurrentQueueSpec;
+import org.jctools.queues.spec.Ordering;
+import org.jctools.queues.spec.Preference;
+
+import teetime.framework.AbstractInterThreadPipe;
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+
+public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
+
+	private final Queue<Object> queue;
+	// statistics
+	private int numWaits;
+
+	<T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		super(sourcePort, targetPort);
+		ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
+		this.queue = QueueFactory.newQueue(specification);
+	}
+
+	@Override
+	public boolean add(final Object element) {
+		// BETTER introduce a QueueIsFullStrategy
+		while (!this.queue.offer(element)) {
+			this.numWaits++;
+			Thread.yield();
+		}
+		// this.reportNewElement();
+		return true;
+	}
+
+	@Override
+	public Object removeLast() {
+		return this.queue.poll();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.queue.isEmpty();
+	}
+
+	@Override
+	public int size() {
+		return this.queue.size();
+	}
+
+	@Override
+	public Object readLast() {
+		return this.queue.peek();
+	}
+
+	// BETTER find a solution w/o any thread-safe code in this stage
+	public synchronized int getNumWaits() {
+		return this.numWaits;
+	}
+
+}
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java
new file mode 100644
index 00000000..d342405f
--- /dev/null
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package teetime.framework.pipe;
+
+import teetime.framework.InputPort;
+import teetime.framework.OutputPort;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
+
+public class UnboundedSpScPipeFactory implements IPipeFactory {
+
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
+		return this.create(sourcePort, targetPort, 0);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * The capacity is ignored.
+	 */
+	@Override
+	public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
+		return new UnboundedSpScPipe(sourcePort, targetPort);
+	}
+
+	@Override
+	public ThreadCommunication getThreadCommunication() {
+		return ThreadCommunication.INTER;
+	}
+
+	@Override
+	public PipeOrdering getOrdering() {
+		return PipeOrdering.QUEUE_BASED;
+	}
+
+	@Override
+	public boolean isGrowable() {
+		return true;
+	}
+
+}
diff --git a/src/main/resources/pipe-factories.conf b/src/main/resources/pipe-factories.conf
index e3d966cf..3e3db904 100644
--- a/src/main/resources/pipe-factories.conf
+++ b/src/main/resources/pipe-factories.conf
@@ -1,4 +1,5 @@
 teetime.framework.pipe.SingleElementPipeFactory
 teetime.framework.pipe.OrderedGrowableArrayPipeFactory
 teetime.framework.pipe.UnorderedGrowablePipeFactory
-teetime.framework.pipe.SpScPipeFactory
\ No newline at end of file
+teetime.framework.pipe.SpScPipeFactory
+teetime.framework.pipe.UnboundedSpScPipeFactory
-- 
GitLab