Skip to content
Snippets Groups Projects
Commit 14d733bb authored by Christian Wulf's avatar Christian Wulf
Browse files

added spsc pipe

parent 641c8726
No related branches found
No related tags found
No related merge requests found
...@@ -26,3 +26,4 @@ ...@@ -26,3 +26,4 @@
11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read) 11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read)
12: 3300 ns (recursive; argument/return w/o pipe) 12: 3300 ns (recursive; argument/return w/o pipe)
13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class) 13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
14: 21,000 ns (spsc pipe)
/* /*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
...@@ -13,95 +11,182 @@ ...@@ -13,95 +11,182 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package teetime.util.concurrent;
package teetime.util.concurrent.spsc;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* <ul> * <ul>
* <li>Lock free, observing single writer principal. * <li>Inlined counters
* <li>Replacing the long fields with AtomicLong and using lazySet instead of volatile assignment. * <li>Counters are padded
* <li>Using the power of 2 mask, forcing the capacity to next power of 2. * <li>Data is padded
* <li>Adding head and tail cache fields. Avoiding redundant volatile reads. * <li>Class is pre-padded
* <li>Padding head/tail AtomicLong fields. Avoiding false sharing. * <li>Padding is doubled to dodge pre-fetch
* <li>Padding head/tail cache fields. Avoiding false sharing. * <li>Use Unsafe to read out of array
* <li>putOrdered into array as Write Memory Barrier
* <li>getVolatile from array as Read Memory Barrier
* <li>ELEMENT_SHIFT is a constant
* </ul> * </ul>
*/ */
public final class P1C1QueueOriginal3<E> implements Queue<E> { class L0Pad {
private final int capacity; public long p00, p01, p02, p03, p04, p05, p06, p07;
private final int mask; public long p30, p31, p32, p33, p34, p35, p36, p37;
private final E[] buffer; }
private final AtomicLong tail = new PaddedAtomicLong(0); class ColdFields<E> extends L0Pad {
private final AtomicLong head = new PaddedAtomicLong(0); protected static final int BUFFER_PAD = 32;
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
protected final int capacity;
protected final long mask;
protected final E[] buffer;
public static class PaddedLong { @SuppressWarnings("unchecked")
public long value = 0, p1, p2, p3, p4, p5, p6; public ColdFields(final int capacity) {
if (Pow2.isPowerOf2(capacity)) {
this.capacity = capacity;
}
else {
this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
}
this.mask = this.capacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
} }
}
private final PaddedLong tailCache = new PaddedLong(); class L1Pad<E> extends ColdFields<E> {
private final PaddedLong headCache = new PaddedLong(); public long p10, p11, p12, p13, p14, p15, p16;
public long p30, p31, p32, p33, p34, p35, p36, p37;
@SuppressWarnings("unchecked") public L1Pad(final int capacity) {
public P1C1QueueOriginal3(final int capacity) { super(capacity);
this.capacity = P1C1QueueOriginal3.findNextPositivePowerOfTwo(capacity); }
this.mask = this.capacity - 1; }
this.buffer = (E[]) new Object[this.capacity];
class TailField<E> extends L1Pad<E> {
protected long tail;
public TailField(final int capacity) {
super(capacity);
}
}
class L2Pad<E> extends TailField<E> {
public long p20, p21, p22, p23, p24, p25, p26;
public long p30, p31, p32, p33, p34, p35, p36, p37;
public L2Pad(final int capacity) {
super(capacity);
}
}
class HeadField<E> extends L2Pad<E> {
protected long head;
public HeadField(final int capacity) {
super(capacity);
}
}
class L3Pad<E> extends HeadField<E> {
public long p40, p41, p42, p43, p44, p45, p46;
public long p30, p31, p32, p33, p34, p35, p36, p37;
public L3Pad(final int capacity) {
super(capacity);
}
}
@SuppressWarnings("restriction")
public final class FFBufferOrdered3<E> extends L3Pad<E> implements Queue<E> {
private final static long TAIL_OFFSET;
private final static long HEAD_OFFSET;
private static final long ARRAY_BASE;
private static final int ELEMENT_SHIFT;
static {
try {
TAIL_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(TailField.class.getDeclaredField("tail"));
HEAD_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(HeadField.class.getDeclaredField("head"));
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
} else if (8 == scale) {
ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// Including the buffer pad in the array base offset
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
public FFBufferOrdered3(final int capacity) {
super(capacity);
} }
public static int findNextPositivePowerOfTwo(final int value) { private long getHead() {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); return UnsafeAccess.UNSAFE.getLongVolatile(this, HEAD_OFFSET);
} }
private long getTail() {
return UnsafeAccess.UNSAFE.getLongVolatile(this, TAIL_OFFSET);
}
@Override
public boolean add(final E e) { public boolean add(final E e) {
if (this.offer(e)) { if (this.offer(e)) {
return true; return true;
} }
throw new IllegalStateException("Queue is full"); throw new IllegalStateException("Queue is full");
} }
private long elementOffsetInBuffer(final long index) {
return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT);
}
// private boolean available() {
// final long offset = this.elementOffsetInBuffer(this.tail);
// Object object = UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
// return object != null;
// }
@Override
public boolean offer(final E e) { public boolean offer(final E e) {
if (null == e) { if (null == e) {
throw new NullPointerException("Null is not a valid element"); throw new NullPointerException("Null is not a valid element");
} }
final long currentTail = this.tail.get(); final long offset = this.elementOffsetInBuffer(this.tail);
final long wrapPoint = currentTail - this.capacity; if (null != UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset)) {
if (this.headCache.value <= wrapPoint) { return false;
this.headCache.value = this.head.get();
if (this.headCache.value <= wrapPoint) {
return false;
}
} }
// STORE/STORE barrier, anything that happens before is visible
this.buffer[(int) currentTail & this.mask] = e; // when the value in the buffer is visible
this.tail.lazySet(currentTail + 1); UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, e);
this.tail++;
return true; return true;
} }
@Override
public E poll() { public E poll() {
final long currentHead = this.head.get(); final long offset = this.elementOffsetInBuffer(this.head);
if (currentHead >= this.tailCache.value) { @SuppressWarnings("unchecked")
this.tailCache.value = this.tail.get(); final E e = (E) UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
if (currentHead >= this.tailCache.value) { if (null == e) {
return null; return null;
}
} }
UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, null);
final int index = (int) currentHead & this.mask; this.head++;
final E e = this.buffer[index];
this.buffer[index] = null;
this.head.lazySet(currentHead + 1);
return e; return e;
} }
@Override
public E remove() { public E remove() {
final E e = this.poll(); final E e = this.poll();
if (null == e) { if (null == e) {
...@@ -111,6 +196,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> { ...@@ -111,6 +196,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return e; return e;
} }
@Override
public E element() { public E element() {
final E e = this.peek(); final E e = this.peek();
if (null == e) { if (null == e) {
...@@ -120,25 +206,36 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> { ...@@ -120,25 +206,36 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return e; return e;
} }
@Override
public E peek() { public E peek() {
return this.buffer[(int) this.head.get() & this.mask]; long currentHead = this.getHead();
return this.getElement(currentHead);
}
@SuppressWarnings("unchecked")
private E getElement(final long index) {
return (E) UnsafeAccess.UNSAFE.getObject(this.buffer, this.elementOffsetInBuffer(index));
} }
@Override
public int size() { public int size() {
return (int) (this.tail.get() - this.head.get()); return (int) (this.getTail() - this.getHead());
} }
@Override
public boolean isEmpty() { public boolean isEmpty() {
return this.tail.get() == this.head.get(); return this.getTail() == this.getHead();
// return null == this.getHead();
} }
@Override
public boolean contains(final Object o) { public boolean contains(final Object o) {
if (null == o) { if (null == o) {
return false; return false;
} }
for (long i = this.head.get(), limit = this.tail.get(); i < limit; i++) { for (long i = this.getHead(), limit = this.getTail(); i < limit; i++) {
final E e = this.buffer[(int) i & this.mask]; final E e = this.getElement(i);
if (o.equals(e)) { if (o.equals(e)) {
return true; return true;
} }
...@@ -147,22 +244,27 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> { ...@@ -147,22 +244,27 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return false; return false;
} }
@Override
public Iterator<E> iterator() { public Iterator<E> iterator() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public Object[] toArray() { public Object[] toArray() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public <T> T[] toArray(final T[] a) { public <T> T[] toArray(final T[] a) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean remove(final Object o) { public boolean remove(final Object o) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean containsAll(final Collection<?> c) { public boolean containsAll(final Collection<?> c) {
for (final Object o : c) { for (final Object o : c) {
if (!this.contains(o)) { if (!this.contains(o)) {
...@@ -173,6 +275,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> { ...@@ -173,6 +275,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return true; return true;
} }
@Override
public boolean addAll(final Collection<? extends E> c) { public boolean addAll(final Collection<? extends E> c) {
for (final E e : c) { for (final E e : c) {
this.add(e); this.add(e);
...@@ -181,18 +284,22 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> { ...@@ -181,18 +284,22 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return true; return true;
} }
@Override
public boolean removeAll(final Collection<?> c) { public boolean removeAll(final Collection<?> c) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean retainAll(final Collection<?> c) { public boolean retainAll(final Collection<?> c) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void clear() { public void clear() {
Object value; Object value;
do { do {
value = this.poll(); value = this.poll();
} while (null != value); } while (null != value);
} }
} }
package teetime.util.concurrent.spsc;
public class Pow2 {
private Pow2() {
// utility class
}
public static int findNextPositivePowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
public static boolean isPowerOf2(final int value) {
return (value & (value - 1)) == 0;
}
}
package teetime.util.concurrent.spsc;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
@SuppressWarnings("restriction")
class UnsafeAccess {
public static final Unsafe UNSAFE;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private UnsafeAccess() {
// utility class
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis14;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThoughputTimestampAnalysis14Test {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final MethodCallThroughputAnalysis14 analysis = new MethodCallThroughputAnalysis14();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
return new TimestampObject();
}
});
analysis.init();
stopWatch.start();
try {
analysis.start();
} finally {
stopWatch.end();
}
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis14 extends Analysis {
private long numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private List<TimestampObject> timestampObjects;
private Runnable runnable;
@Override
public void init() {
super.init();
this.runnable = this.buildPipeline();
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline() {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
pipeline.onStart();
// pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object());
// pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
return runnable;
}
@Override
public void start() {
super.start();
this.runnable.run();
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<TimestampObject> getTimestampObjects() {
return this.timestampObjects;
}
public void setTimestampObjects(final List<TimestampObject> timestampObjects) {
this.timestampObjects = timestampObjects;
}
}
package teetime.examples.throughput.methodcall;
import teetime.util.concurrent.spsc.FFBufferOrdered3;
public class SpScPipe<T> implements IPipe<T> {
private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(4);
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new SpScPipe<T>();
sourcePort.pipe = pipe;
targetPort.pipe = pipe;
}
@Override
public void add(final T element) {
this.queue.offer(element);
}
@Override
public T removeLast() {
return this.queue.poll();
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
@Override
public int size() {
return this.queue.size();
}
@Override
public T readLast() {
return this.queue.peek();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment