From b85916cc3b1515d0ae4ababf7dcb749ffab88cc8 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 8 Jul 2014 08:16:00 +0200 Subject: [PATCH] removed own spsc implementation; added maven dep: jctools --- pom.xml | 14 + .../util/concurrent/PaddedAtomicLong.java | 28 -- .../concurrent/spsc/FFBufferOrdered3.java | 305 ------------------ .../teetime/util/concurrent/spsc/Pow2.java | 16 - .../util/concurrent/spsc/UnsafeAccess.java | 26 -- .../framework/core/pipe/SpScPipe.java | 12 +- 6 files changed, 23 insertions(+), 378 deletions(-) delete mode 100644 src/main/java/teetime/util/concurrent/PaddedAtomicLong.java delete mode 100644 src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java delete mode 100644 src/main/java/teetime/util/concurrent/spsc/Pow2.java delete mode 100644 src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java diff --git a/pom.xml b/pom.xml index c5e63b6d..1a5601f8 100644 --- a/pom.xml +++ b/pom.xml @@ -48,10 +48,24 @@ <artifactId>guava</artifactId> <version>17.0</version> </dependency> + <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> </dependencies> <build> <plugins> + <!-- we want JDK 1.6 source and binary compatiblility --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/src/main/java/teetime/util/concurrent/PaddedAtomicLong.java b/src/main/java/teetime/util/concurrent/PaddedAtomicLong.java deleted file mode 100644 index eeffdaf2..00000000 --- a/src/main/java/teetime/util/concurrent/PaddedAtomicLong.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.util.concurrent; - -import java.util.concurrent.atomic.AtomicLong; - -public class PaddedAtomicLong extends AtomicLong { - public PaddedAtomicLong() {} - - public PaddedAtomicLong(final long initialValue) { - super(initialValue); - } - - public volatile long p1, p2, p3, p4, p5, p6 = 7; -} diff --git a/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java b/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java deleted file mode 100644 index 9d8f0edb..00000000 --- a/src/main/java/teetime/util/concurrent/spsc/FFBufferOrdered3.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package teetime.util.concurrent.spsc; - -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Queue; - -/** - * <ul> - * <li>Inlined counters - * <li>Counters are padded - * <li>Data is padded - * <li>Class is pre-padded - * <li>Padding is doubled to dodge pre-fetch - * <li>Use Unsafe to read out of array - * <li>putOrdered into array as Write Memory Barrier - * <li>getVolatile from array as Read Memory Barrier - * <li>ELEMENT_SHIFT is a constant - * </ul> - */ -class L0Pad { - public long p00, p01, p02, p03, p04, p05, p06, p07; - public long p30, p31, p32, p33, p34, p35, p36, p37; -} - -class ColdFields<E> extends L0Pad { - protected static final int BUFFER_PAD = 32; - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); - protected final int capacity; - protected final long mask; - protected final E[] buffer; - - @SuppressWarnings("unchecked") - public ColdFields(final int capacity) { - if (Pow2.isPowerOf2(capacity)) { - this.capacity = capacity; - } - else { - this.capacity = Pow2.findNextPositivePowerOfTwo(capacity); - } - this.mask = this.capacity - 1; - // pad data on either end with some empty slots. - this.buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - } -} - -class L1Pad<E> extends ColdFields<E> { - public long p10, p11, p12, p13, p14, p15, p16; - public long p30, p31, p32, p33, p34, p35, p36, p37; - - public L1Pad(final int capacity) { - super(capacity); - } -} - -class TailField<E> extends L1Pad<E> { - protected long tail; - - public TailField(final int capacity) { - super(capacity); - } -} - -class L2Pad<E> extends TailField<E> { - public long p20, p21, p22, p23, p24, p25, p26; - public long p30, p31, p32, p33, p34, p35, p36, p37; - - public L2Pad(final int capacity) { - super(capacity); - } -} - -class HeadField<E> extends L2Pad<E> { - protected long head; - - public HeadField(final int capacity) { - super(capacity); - } -} - -class L3Pad<E> extends HeadField<E> { - public long p40, p41, p42, p43, p44, p45, p46; - public long p30, p31, p32, p33, p34, p35, p36, p37; - - public L3Pad(final int capacity) { - super(capacity); - } -} - -@SuppressWarnings("restriction") -public final class FFBufferOrdered3<E> extends L3Pad<E> implements Queue<E> { - private final static long TAIL_OFFSET; - private final static long HEAD_OFFSET; - private static final long ARRAY_BASE; - private static final int ELEMENT_SHIFT; - static { - try { - TAIL_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(TailField.class.getDeclaredField("tail")); - HEAD_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(HeadField.class.getDeclaredField("head")); - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - ELEMENT_SHIFT = 2 + SPARSE_SHIFT; - } else if (8 == scale) { - ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - // Including the buffer pad in the array base offset - ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT)); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - public FFBufferOrdered3(final int capacity) { - super(capacity); - } - - private long getHead() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, HEAD_OFFSET); - } - - private long getTail() { - return UnsafeAccess.UNSAFE.getLongVolatile(this, TAIL_OFFSET); - } - - @Override - public boolean add(final E e) { - if (this.offer(e)) { - return true; - } - throw new IllegalStateException("Queue is full"); - } - - private long elementOffsetInBuffer(final long index) { - return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT); - } - - // private boolean available() { - // final long offset = this.elementOffsetInBuffer(this.tail); - // Object object = UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset); - // return object != null; - // } - - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - - final long offset = this.elementOffsetInBuffer(this.tail); - if (null != UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset)) { - return false; - } - // STORE/STORE barrier, anything that happens before is visible - // when the value in the buffer is visible - UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, e); - this.tail++; - return true; - } - - @Override - public E poll() { - final long offset = this.elementOffsetInBuffer(this.head); - @SuppressWarnings("unchecked") - final E e = (E) UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset); - if (null == e) { - return null; - } - UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, null); - this.head++; - return e; - } - - @Override - public E remove() { - final E e = this.poll(); - if (null == e) { - throw new NoSuchElementException("Queue is empty"); - } - - return e; - } - - @Override - public E element() { - final E e = this.peek(); - if (null == e) { - throw new NoSuchElementException("Queue is empty"); - } - - return e; - } - - @Override - public E peek() { - long currentHead = this.getHead(); - return this.getElement(currentHead); - } - - @SuppressWarnings("unchecked") - private E getElement(final long index) { - return (E) UnsafeAccess.UNSAFE.getObject(this.buffer, this.elementOffsetInBuffer(index)); - } - - @Override - public int size() { - return (int) (this.getTail() - this.getHead()); - } - - @Override - public boolean isEmpty() { - return this.getTail() == this.getHead(); - // return null == this.getHead(); - } - - @Override - public boolean contains(final Object o) { - if (null == o) { - return false; - } - - for (long i = this.getHead(), limit = this.getTail(); i < limit; i++) { - final E e = this.getElement(i); - if (o.equals(e)) { - return true; - } - } - - return false; - } - - @Override - public Iterator<E> iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public <T> T[] toArray(final T[] a) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(final Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(final Collection<?> c) { - for (final Object o : c) { - if (!this.contains(o)) { - return false; - } - } - - return true; - } - - @Override - public boolean addAll(final Collection<? extends E> c) { - for (final E e : c) { - this.add(e); - } - - return true; - } - - @Override - public boolean removeAll(final Collection<?> c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(final Collection<?> c) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - Object value; - do { - value = this.poll(); - } while (null != value); - } - -} diff --git a/src/main/java/teetime/util/concurrent/spsc/Pow2.java b/src/main/java/teetime/util/concurrent/spsc/Pow2.java deleted file mode 100644 index 266964aa..00000000 --- a/src/main/java/teetime/util/concurrent/spsc/Pow2.java +++ /dev/null @@ -1,16 +0,0 @@ -package teetime.util.concurrent.spsc; - -public class Pow2 { - - private Pow2() { - // utility class - } - - public static int findNextPositivePowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - - public static boolean isPowerOf2(final int value) { - return (value & (value - 1)) == 0; - } -} diff --git a/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java b/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java deleted file mode 100644 index 0c717842..00000000 --- a/src/main/java/teetime/util/concurrent/spsc/UnsafeAccess.java +++ /dev/null @@ -1,26 +0,0 @@ -package teetime.util.concurrent.spsc; - -import java.lang.reflect.Field; - -import sun.misc.Unsafe; - -@SuppressWarnings("restriction") -class UnsafeAccess { - - public static final Unsafe UNSAFE; - - static { - try { - Field field = Unsafe.class.getDeclaredField("theUnsafe"); - field.setAccessible(true); - UNSAFE = (Unsafe) field.get(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private UnsafeAccess() { - // utility class - } - -} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index fd2014a9..836af47b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -1,20 +1,26 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; -import teetime.util.concurrent.spsc.FFBufferOrdered3; +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; + import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Signal; public class SpScPipe<T> extends AbstractPipe<T> { - private final FFBufferOrdered3<T> queue; + private final Queue<T> queue; private int maxSize; private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); public SpScPipe(final int capacity) { - this.queue = new FFBufferOrdered3<T>(capacity); + ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT); + this.queue = QueueFactory.newQueue(concurrentQueueSpec); } public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) { -- GitLab