From e7a2885c71bb5b149d5b58b5af877675e60d22c5 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 24 Mar 2015 11:02:30 +0100
Subject: [PATCH] added ObservableSpScArrayQueue

---
 .../queues/ObservableSpScArrayQueue.java      | 139 ++++++++++++++++++
 .../framework/AbstractInterThreadPipe.java    |   4 +
 .../teetime/framework/pipe/RelayTestPipe.java |  10 ++
 .../java/teetime/framework/pipe/SpScPipe.java |  19 ++-
 .../framework/pipe/UnboundedSpScPipe.java     |  10 ++
 src/site/markdown/wiki                        |   2 +-
 6 files changed, 177 insertions(+), 7 deletions(-)
 create mode 100644 src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java

diff --git a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java
new file mode 100644
index 00000000..687ab9d4
--- /dev/null
+++ b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java
@@ -0,0 +1,139 @@
+package org.jctools.queues;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+
+public final class ObservableSpScArrayQueue<E> implements Queue<E> {
+
+	private final SpscArrayQueue<E> queue;
+
+	private transient long lastProducerIndex, lastConsumerIndex;
+
+	public ObservableSpScArrayQueue(final int requestedCapacity) {
+		this.queue = new SpscArrayQueue<E>(requestedCapacity);
+	}
+
+	public long getProducerFrequency() {
+		final long currentProducerIndex = queue.lvProducerIndex();
+		long diff = currentProducerIndex - lastProducerIndex;
+		lastProducerIndex = currentProducerIndex;
+		return diff;
+	}
+
+	public long getConsumerFrequency() {
+		final long currentConsumerIndex = queue.lvConsumerIndex();
+		long diff = currentConsumerIndex - lastConsumerIndex;
+		lastConsumerIndex = currentConsumerIndex;
+		return diff;
+	}
+
+	@Override
+	public int hashCode() {
+		return queue.hashCode();
+	}
+
+	@Override
+	public boolean add(final E e) {
+		return queue.add(e);
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return queue.isEmpty();
+	}
+
+	@Override
+	public boolean contains(final Object o) {
+		return queue.contains(o);
+	}
+
+	@Override
+	public E remove() {
+		return queue.remove();
+	}
+
+	@Override
+	public Object[] toArray() {
+		return queue.toArray();
+	}
+
+	@Override
+	public boolean equals(final Object obj) {
+		if (obj instanceof ObservableSpScArrayQueue) {
+			return queue.equals(((ObservableSpScArrayQueue<?>) obj).queue);
+		}
+		return false;
+	}
+
+	@Override
+	public E element() {
+		return queue.element();
+	}
+
+	@Override
+	public boolean offer(final E e) {
+		return queue.offer(e);
+	}
+
+	@Override
+	public <T> T[] toArray(final T[] a) {
+		return queue.toArray(a);
+	}
+
+	@Override
+	public boolean addAll(final Collection<? extends E> c) {
+		return queue.addAll(c);
+	}
+
+	@Override
+	public E poll() {
+		return queue.poll();
+	}
+
+	@Override
+	public E peek() {
+		return queue.peek();
+	}
+
+	@Override
+	public int size() {
+		return queue.size();
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return queue.iterator();
+	}
+
+	@Override
+	public void clear() {
+		queue.clear();
+	}
+
+	@Override
+	public boolean remove(final Object o) {
+		return queue.remove(o);
+	}
+
+	@Override
+	public boolean containsAll(final Collection<?> c) {
+		return queue.containsAll(c);
+	}
+
+	@Override
+	public boolean removeAll(final Collection<?> c) {
+		return queue.removeAll(c);
+	}
+
+	@Override
+	public boolean retainAll(final Collection<?> c) {
+		return queue.retainAll(c);
+	}
+
+	@Override
+	public String toString() {
+		return queue.toString();
+	}
+
+}
diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
index 6c3e3061..d759ac47 100644
--- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java
+++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java
@@ -78,4 +78,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
 	public final void close() {
 		isClosed = true;
 	}
+
+	public abstract long getPushThroughput();
+
+	public abstract long getPullThroughput();
 }
diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
index 9afebf5f..767f7f31 100644
--- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java
+++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java
@@ -59,4 +59,14 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
 		return this.numInputObjects;
 	}
 
+	@Override
+	public long getPushThroughput() {
+		return -1;
+	}
+
+	@Override
+	public long getPullThroughput() {
+		return -1;
+	}
+
 }
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 106506de..f19e41ef 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -15,10 +15,7 @@
  */
 package teetime.framework.pipe;
 
-import java.util.Queue;
-
-import org.jctools.queues.QueueFactory;
-import org.jctools.queues.spec.ConcurrentQueueSpec;
+import org.jctools.queues.ObservableSpScArrayQueue;
 
 import teetime.framework.AbstractInterThreadPipe;
 import teetime.framework.InputPort;
@@ -28,13 +25,13 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 
 	// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
 
-	private final Queue<Object> queue;
+	private final ObservableSpScArrayQueue<Object> queue;
 	// statistics
 	private int numWaits;
 
 	<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
 		super(sourcePort, targetPort);
-		this.queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(capacity));
+		this.queue = new ObservableSpScArrayQueue<Object>(capacity);
 	}
 
 	@Deprecated
@@ -86,4 +83,14 @@ public final class SpScPipe extends AbstractInterThreadPipe {
 		return this.numWaits;
 	}
 
+	@Override
+	public long getPushThroughput() {
+		return queue.getProducerFrequency();
+	}
+
+	@Override
+	public long getPullThroughput() {
+		return queue.getConsumerFrequency();
+	}
+
 }
diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
index ded5c049..207225c0 100644
--- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java
@@ -61,4 +61,14 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
 		return this.queue.size();
 	}
 
+	@Override
+	public long getPushThroughput() {
+		return -1;
+	}
+
+	@Override
+	public long getPullThroughput() {
+		return -1;
+	}
+
 }
diff --git a/src/site/markdown/wiki b/src/site/markdown/wiki
index 63ccbbc8..0e447457 160000
--- a/src/site/markdown/wiki
+++ b/src/site/markdown/wiki
@@ -1 +1 @@
-Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de
+Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8
-- 
GitLab