diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 73696bc8792ba367b6bed8292700d69e56a60288..bab7bdfb1d79606336c3f0d234385de4edfe279a 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -22,7 +22,7 @@ import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorablePipe { +class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -38,7 +38,7 @@ final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorab // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { - while (!this.queue.offer(element)) { + while (!addNonBlocking(element)) { // Thread.yield(); if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED || Thread.currentThread().isInterrupted()) { diff --git a/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java b/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java index 6ddee04149c2958c9056be7bd203a6b28464b6d3..8d6f3c1231f40222860b69f5c9441ccfab7cddbe 100644 --- a/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java +++ b/src/main/java/teetime/util/framework/concurrent/queue/PCBlockingQueue.java @@ -38,20 +38,17 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { } @Override - public void put(final E e) throws InterruptedException - { + public void put(final E e) throws InterruptedException { putStrategy.backoffOffer(q, e); } @Override - public E take() throws InterruptedException - { + public E take() throws InterruptedException { return takeStrategy.waitPoll(q); } @Override - public boolean offer(final E e) - { + public boolean offer(final E e) { boolean offered = q.offer(e); if (offered) { takeStrategy.signal(); @@ -60,8 +57,7 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { } @Override - public E poll() - { + public E poll() { E e = q.poll(); if (e != null) { putStrategy.signal(); @@ -85,12 +81,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { } @Override - public int drainTo(final Collection<? super E> c) - { + public int drainTo(final Collection<? super E> c) { int count = 0; E e; - while ((e = poll()) != null) - { + while ((e = poll()) != null) { c.add(e); count++; } @@ -98,12 +92,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { } @Override - public int drainTo(final Collection<? super E> c, final int maxElements) - { + public int drainTo(final Collection<? super E> c, final int maxElements) { int count = 0; E e; - while (((e = poll()) != null) && count < maxElements) - { + while (((e = poll()) != null) && count < maxElements) { c.add(e); count++; } diff --git a/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCBundleTakeStrategy.java b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCBundleTakeStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..20c1fae18bce9a4462017b82fed6ea312dd234f5 --- /dev/null +++ b/src/main/java/teetime/util/framework/concurrent/queue/takestrategy/SCBundleTakeStrategy.java @@ -0,0 +1,79 @@ +package teetime.util.framework.concurrent.queue.takestrategy; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; + +public class SCBundleTakeStrategy<E> implements TakeStrategy<E> { + + public volatile int storeFence = 0; // NOCS + + private final AtomicReference<Thread> t = new AtomicReference<Thread>(null); + private final int commitThreshold; + + private volatile long timeoutInNs; + + private int numUncommittedElements; + + public SCBundleTakeStrategy(final int commitThreshold, final long timeoutInNs, final int numUncommittedElements) { + if (commitThreshold < 1) { + throw new IllegalArgumentException("commitThreshold is non-positive. Only positive values are permitted."); + } + this.commitThreshold = commitThreshold; + if (timeoutInNs < 1) { + throw new IllegalArgumentException("timeoutInNs is non-positive. Only positive values are permitted."); + } + this.timeoutInNs = timeoutInNs; + } + + @Override + public void signal() { + numUncommittedElements++; + if (numUncommittedElements == commitThreshold) { + numUncommittedElements = 0; + storeFence = 1; // store barrier + + LockSupport.unpark(t.get()); // t.get() load barrier + } + } + + @Override + public E waitPoll(final Queue<E> q) throws InterruptedException { + E element = q.poll(); + if (element != null) { + return element; + } + + t.set(Thread.currentThread()); + + do { + LockSupport.parkNanos(timeoutInNs); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Interrupted while waiting for the queue to become non-empty."); + } + + t.lazySet(null); + + element = q.poll(); + } while (element == null); + + return element; + } + + public int getCommitThreshold() { + return commitThreshold; + } + + public long getTimeoutInMs() { + return timeoutInNs; + } + + /** + * Thread-safe setter + * + * @param timeoutInMs + */ + public void setTimeoutInMs(final long timeoutInMs) { + this.timeoutInNs = timeoutInMs; + } +}