Skip to content
Snippets Groups Projects
Commit e7a2885c authored by Christian Wulf's avatar Christian Wulf
Browse files

added ObservableSpScArrayQueue

parent 821e1f36
No related branches found
No related tags found
No related merge requests found
package org.jctools.queues;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
public final class ObservableSpScArrayQueue<E> implements Queue<E> {
private final SpscArrayQueue<E> queue;
private transient long lastProducerIndex, lastConsumerIndex;
public ObservableSpScArrayQueue(final int requestedCapacity) {
this.queue = new SpscArrayQueue<E>(requestedCapacity);
}
public long getProducerFrequency() {
final long currentProducerIndex = queue.lvProducerIndex();
long diff = currentProducerIndex - lastProducerIndex;
lastProducerIndex = currentProducerIndex;
return diff;
}
public long getConsumerFrequency() {
final long currentConsumerIndex = queue.lvConsumerIndex();
long diff = currentConsumerIndex - lastConsumerIndex;
lastConsumerIndex = currentConsumerIndex;
return diff;
}
@Override
public int hashCode() {
return queue.hashCode();
}
@Override
public boolean add(final E e) {
return queue.add(e);
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public boolean contains(final Object o) {
return queue.contains(o);
}
@Override
public E remove() {
return queue.remove();
}
@Override
public Object[] toArray() {
return queue.toArray();
}
@Override
public boolean equals(final Object obj) {
if (obj instanceof ObservableSpScArrayQueue) {
return queue.equals(((ObservableSpScArrayQueue<?>) obj).queue);
}
return false;
}
@Override
public E element() {
return queue.element();
}
@Override
public boolean offer(final E e) {
return queue.offer(e);
}
@Override
public <T> T[] toArray(final T[] a) {
return queue.toArray(a);
}
@Override
public boolean addAll(final Collection<? extends E> c) {
return queue.addAll(c);
}
@Override
public E poll() {
return queue.poll();
}
@Override
public E peek() {
return queue.peek();
}
@Override
public int size() {
return queue.size();
}
@Override
public Iterator<E> iterator() {
return queue.iterator();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean remove(final Object o) {
return queue.remove(o);
}
@Override
public boolean containsAll(final Collection<?> c) {
return queue.containsAll(c);
}
@Override
public boolean removeAll(final Collection<?> c) {
return queue.removeAll(c);
}
@Override
public boolean retainAll(final Collection<?> c) {
return queue.retainAll(c);
}
@Override
public String toString() {
return queue.toString();
}
}
...@@ -78,4 +78,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -78,4 +78,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
public final void close() { public final void close() {
isClosed = true; isClosed = true;
} }
public abstract long getPushThroughput();
public abstract long getPullThroughput();
} }
...@@ -59,4 +59,14 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe { ...@@ -59,4 +59,14 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
return this.numInputObjects; return this.numInputObjects;
} }
@Override
public long getPushThroughput() {
return -1;
}
@Override
public long getPullThroughput() {
return -1;
}
} }
...@@ -15,10 +15,7 @@ ...@@ -15,10 +15,7 @@
*/ */
package teetime.framework.pipe; package teetime.framework.pipe;
import java.util.Queue; import org.jctools.queues.ObservableSpScArrayQueue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
...@@ -28,13 +25,13 @@ public final class SpScPipe extends AbstractInterThreadPipe { ...@@ -28,13 +25,13 @@ public final class SpScPipe extends AbstractInterThreadPipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
private final Queue<Object> queue; private final ObservableSpScArrayQueue<Object> queue;
// statistics // statistics
private int numWaits; private int numWaits;
<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort); super(sourcePort, targetPort);
this.queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(capacity)); this.queue = new ObservableSpScArrayQueue<Object>(capacity);
} }
@Deprecated @Deprecated
...@@ -86,4 +83,14 @@ public final class SpScPipe extends AbstractInterThreadPipe { ...@@ -86,4 +83,14 @@ public final class SpScPipe extends AbstractInterThreadPipe {
return this.numWaits; return this.numWaits;
} }
@Override
public long getPushThroughput() {
return queue.getProducerFrequency();
}
@Override
public long getPullThroughput() {
return queue.getConsumerFrequency();
}
} }
...@@ -61,4 +61,14 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe { ...@@ -61,4 +61,14 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
return this.queue.size(); return this.queue.size();
} }
@Override
public long getPushThroughput() {
return -1;
}
@Override
public long getPullThroughput() {
return -1;
}
} }
wiki @ 0e447457
Subproject commit 63ccbbc87bd2c0e6599ca91502149dba3cfb99de Subproject commit 0e4474577e1f49bc96e734c286b2d9e0363895e8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment