Skip to content
Snippets Groups Projects
Commit 87e5ca23 authored by Christian Wulf's avatar Christian Wulf
Browse files

added spsc pipe

parent 349651e5
No related branches found
No related tags found
No related merge requests found
......@@ -26,3 +26,4 @@
11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read)
12: 3300 ns (recursive; argument/return w/o pipe)
13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
14: 21,000 ns (spsc pipe)
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
......@@ -13,95 +11,182 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.util.concurrent;
package teetime.util.concurrent.spsc;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/**
* <ul>
* <li>Lock free, observing single writer principal.
* <li>Replacing the long fields with AtomicLong and using lazySet instead of volatile assignment.
* <li>Using the power of 2 mask, forcing the capacity to next power of 2.
* <li>Adding head and tail cache fields. Avoiding redundant volatile reads.
* <li>Padding head/tail AtomicLong fields. Avoiding false sharing.
* <li>Padding head/tail cache fields. Avoiding false sharing.
* <li>Inlined counters
* <li>Counters are padded
* <li>Data is padded
* <li>Class is pre-padded
* <li>Padding is doubled to dodge pre-fetch
* <li>Use Unsafe to read out of array
* <li>putOrdered into array as Write Memory Barrier
* <li>getVolatile from array as Read Memory Barrier
* <li>ELEMENT_SHIFT is a constant
* </ul>
*/
public final class P1C1QueueOriginal3<E> implements Queue<E> {
private final int capacity;
private final int mask;
private final E[] buffer;
class L0Pad {
public long p00, p01, p02, p03, p04, p05, p06, p07;
public long p30, p31, p32, p33, p34, p35, p36, p37;
}
private final AtomicLong tail = new PaddedAtomicLong(0);
private final AtomicLong head = new PaddedAtomicLong(0);
class ColdFields<E> extends L0Pad {
protected static final int BUFFER_PAD = 32;
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
protected final int capacity;
protected final long mask;
protected final E[] buffer;
public static class PaddedLong {
public long value = 0, p1, p2, p3, p4, p5, p6;
@SuppressWarnings("unchecked")
public ColdFields(final int capacity) {
if (Pow2.isPowerOf2(capacity)) {
this.capacity = capacity;
}
else {
this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
}
this.mask = this.capacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}
}
private final PaddedLong tailCache = new PaddedLong();
private final PaddedLong headCache = new PaddedLong();
class L1Pad<E> extends ColdFields<E> {
public long p10, p11, p12, p13, p14, p15, p16;
public long p30, p31, p32, p33, p34, p35, p36, p37;
@SuppressWarnings("unchecked")
public P1C1QueueOriginal3(final int capacity) {
this.capacity = P1C1QueueOriginal3.findNextPositivePowerOfTwo(capacity);
this.mask = this.capacity - 1;
this.buffer = (E[]) new Object[this.capacity];
public L1Pad(final int capacity) {
super(capacity);
}
}
class TailField<E> extends L1Pad<E> {
protected long tail;
public TailField(final int capacity) {
super(capacity);
}
}
class L2Pad<E> extends TailField<E> {
public long p20, p21, p22, p23, p24, p25, p26;
public long p30, p31, p32, p33, p34, p35, p36, p37;
public L2Pad(final int capacity) {
super(capacity);
}
}
class HeadField<E> extends L2Pad<E> {
protected long head;
public HeadField(final int capacity) {
super(capacity);
}
}
class L3Pad<E> extends HeadField<E> {
public long p40, p41, p42, p43, p44, p45, p46;
public long p30, p31, p32, p33, p34, p35, p36, p37;
public L3Pad(final int capacity) {
super(capacity);
}
}
@SuppressWarnings("restriction")
public final class FFBufferOrdered3<E> extends L3Pad<E> implements Queue<E> {
private final static long TAIL_OFFSET;
private final static long HEAD_OFFSET;
private static final long ARRAY_BASE;
private static final int ELEMENT_SHIFT;
static {
try {
TAIL_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(TailField.class.getDeclaredField("tail"));
HEAD_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(HeadField.class.getDeclaredField("head"));
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
} else if (8 == scale) {
ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// Including the buffer pad in the array base offset
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
public FFBufferOrdered3(final int capacity) {
super(capacity);
}
public static int findNextPositivePowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
private long getHead() {
return UnsafeAccess.UNSAFE.getLongVolatile(this, HEAD_OFFSET);
}
private long getTail() {
return UnsafeAccess.UNSAFE.getLongVolatile(this, TAIL_OFFSET);
}
@Override
public boolean add(final E e) {
if (this.offer(e)) {
return true;
}
throw new IllegalStateException("Queue is full");
}
private long elementOffsetInBuffer(final long index) {
return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT);
}
// private boolean available() {
// final long offset = this.elementOffsetInBuffer(this.tail);
// Object object = UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
// return object != null;
// }
@Override
public boolean offer(final E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
final long currentTail = this.tail.get();
final long wrapPoint = currentTail - this.capacity;
if (this.headCache.value <= wrapPoint) {
this.headCache.value = this.head.get();
if (this.headCache.value <= wrapPoint) {
return false;
}
final long offset = this.elementOffsetInBuffer(this.tail);
if (null != UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset)) {
return false;
}
this.buffer[(int) currentTail & this.mask] = e;
this.tail.lazySet(currentTail + 1);
// STORE/STORE barrier, anything that happens before is visible
// when the value in the buffer is visible
UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, e);
this.tail++;
return true;
}
@Override
public E poll() {
final long currentHead = this.head.get();
if (currentHead >= this.tailCache.value) {
this.tailCache.value = this.tail.get();
if (currentHead >= this.tailCache.value) {
return null;
}
final long offset = this.elementOffsetInBuffer(this.head);
@SuppressWarnings("unchecked")
final E e = (E) UnsafeAccess.UNSAFE.getObjectVolatile(this.buffer, offset);
if (null == e) {
return null;
}
final int index = (int) currentHead & this.mask;
final E e = this.buffer[index];
this.buffer[index] = null;
this.head.lazySet(currentHead + 1);
UnsafeAccess.UNSAFE.putOrderedObject(this.buffer, offset, null);
this.head++;
return e;
}
@Override
public E remove() {
final E e = this.poll();
if (null == e) {
......@@ -111,6 +196,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return e;
}
@Override
public E element() {
final E e = this.peek();
if (null == e) {
......@@ -120,25 +206,36 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return e;
}
@Override
public E peek() {
return this.buffer[(int) this.head.get() & this.mask];
long currentHead = this.getHead();
return this.getElement(currentHead);
}
@SuppressWarnings("unchecked")
private E getElement(final long index) {
return (E) UnsafeAccess.UNSAFE.getObject(this.buffer, this.elementOffsetInBuffer(index));
}
@Override
public int size() {
return (int) (this.tail.get() - this.head.get());
return (int) (this.getTail() - this.getHead());
}
@Override
public boolean isEmpty() {
return this.tail.get() == this.head.get();
return this.getTail() == this.getHead();
// return null == this.getHead();
}
@Override
public boolean contains(final Object o) {
if (null == o) {
return false;
}
for (long i = this.head.get(), limit = this.tail.get(); i < limit; i++) {
final E e = this.buffer[(int) i & this.mask];
for (long i = this.getHead(), limit = this.getTail(); i < limit; i++) {
final E e = this.getElement(i);
if (o.equals(e)) {
return true;
}
......@@ -147,22 +244,27 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return false;
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public <T> T[] toArray(final T[] a) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(final Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(final Collection<?> c) {
for (final Object o : c) {
if (!this.contains(o)) {
......@@ -173,6 +275,7 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return true;
}
@Override
public boolean addAll(final Collection<? extends E> c) {
for (final E e : c) {
this.add(e);
......@@ -181,18 +284,22 @@ public final class P1C1QueueOriginal3<E> implements Queue<E> {
return true;
}
@Override
public boolean removeAll(final Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(final Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
Object value;
do {
value = this.poll();
} while (null != value);
}
}
package teetime.util.concurrent.spsc;
public class Pow2 {
private Pow2() {
// utility class
}
public static int findNextPositivePowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
public static boolean isPowerOf2(final int value) {
return (value & (value - 1)) == 0;
}
}
package teetime.util.concurrent.spsc;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
@SuppressWarnings("restriction")
class UnsafeAccess {
public static final Unsafe UNSAFE;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private UnsafeAccess() {
// utility class
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis14;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThoughputTimestampAnalysis14Test {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final MethodCallThroughputAnalysis14 analysis = new MethodCallThroughputAnalysis14();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
return new TimestampObject();
}
});
analysis.init();
stopWatch.start();
try {
analysis.start();
} finally {
stopWatch.end();
}
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis14 extends Analysis {
private long numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private List<TimestampObject> timestampObjects;
private Runnable runnable;
@Override
public void init() {
super.init();
this.runnable = this.buildPipeline();
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline() {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
SpScPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
SpScPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
pipeline.onStart();
// pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object());
// pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
return runnable;
}
@Override
public void start() {
super.start();
this.runnable.run();
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<TimestampObject> getTimestampObjects() {
return this.timestampObjects;
}
public void setTimestampObjects(final List<TimestampObject> timestampObjects) {
this.timestampObjects = timestampObjects;
}
}
package teetime.examples.throughput.methodcall;
import teetime.util.concurrent.spsc.FFBufferOrdered3;
public class SpScPipe<T> implements IPipe<T> {
private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(4);
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new SpScPipe<T>();
sourcePort.pipe = pipe;
targetPort.pipe = pipe;
}
@Override
public void add(final T element) {
this.queue.offer(element);
}
@Override
public T removeLast() {
return this.queue.poll();
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
@Override
public int size() {
return this.queue.size();
}
@Override
public T readLast() {
return this.queue.peek();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment