Skip to content
Snippets Groups Projects
Commit cab4c212 authored by Christian Wulf's avatar Christian Wulf
Browse files

added SCBundleTakeStrategy

parent bb137f4a
No related branches found
No related tags found
No related merge requests found
...@@ -22,7 +22,7 @@ import teetime.framework.StageState; ...@@ -22,7 +22,7 @@ import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException; import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorablePipe { class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
...@@ -38,7 +38,7 @@ final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorab ...@@ -38,7 +38,7 @@ final class SpScPipe<T> extends AbstractInterThreadPipe<T> implements IMonitorab
// BETTER introduce a QueueIsFullStrategy // BETTER introduce a QueueIsFullStrategy
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
while (!this.queue.offer(element)) { while (!addNonBlocking(element)) {
// Thread.yield(); // Thread.yield();
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED || if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED ||
Thread.currentThread().isInterrupted()) { Thread.currentThread().isInterrupted()) {
......
...@@ -38,20 +38,17 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { ...@@ -38,20 +38,17 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> {
} }
@Override @Override
public void put(final E e) throws InterruptedException public void put(final E e) throws InterruptedException {
{
putStrategy.backoffOffer(q, e); putStrategy.backoffOffer(q, e);
} }
@Override @Override
public E take() throws InterruptedException public E take() throws InterruptedException {
{
return takeStrategy.waitPoll(q); return takeStrategy.waitPoll(q);
} }
@Override @Override
public boolean offer(final E e) public boolean offer(final E e) {
{
boolean offered = q.offer(e); boolean offered = q.offer(e);
if (offered) { if (offered) {
takeStrategy.signal(); takeStrategy.signal();
...@@ -60,8 +57,7 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { ...@@ -60,8 +57,7 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> {
} }
@Override @Override
public E poll() public E poll() {
{
E e = q.poll(); E e = q.poll();
if (e != null) { if (e != null) {
putStrategy.signal(); putStrategy.signal();
...@@ -85,12 +81,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { ...@@ -85,12 +81,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> {
} }
@Override @Override
public int drainTo(final Collection<? super E> c) public int drainTo(final Collection<? super E> c) {
{
int count = 0; int count = 0;
E e; E e;
while ((e = poll()) != null) while ((e = poll()) != null) {
{
c.add(e); c.add(e);
count++; count++;
} }
...@@ -98,12 +92,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> { ...@@ -98,12 +92,10 @@ public final class PCBlockingQueue<E> implements BlockingQueue<E> {
} }
@Override @Override
public int drainTo(final Collection<? super E> c, final int maxElements) public int drainTo(final Collection<? super E> c, final int maxElements) {
{
int count = 0; int count = 0;
E e; E e;
while (((e = poll()) != null) && count < maxElements) while (((e = poll()) != null) && count < maxElements) {
{
c.add(e); c.add(e);
count++; count++;
} }
......
package teetime.util.framework.concurrent.queue.takestrategy;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class SCBundleTakeStrategy<E> implements TakeStrategy<E> {
public volatile int storeFence = 0; // NOCS
private final AtomicReference<Thread> t = new AtomicReference<Thread>(null);
private final int commitThreshold;
private volatile long timeoutInNs;
private int numUncommittedElements;
public SCBundleTakeStrategy(final int commitThreshold, final long timeoutInNs, final int numUncommittedElements) {
if (commitThreshold < 1) {
throw new IllegalArgumentException("commitThreshold is non-positive. Only positive values are permitted.");
}
this.commitThreshold = commitThreshold;
if (timeoutInNs < 1) {
throw new IllegalArgumentException("timeoutInNs is non-positive. Only positive values are permitted.");
}
this.timeoutInNs = timeoutInNs;
}
@Override
public void signal() {
numUncommittedElements++;
if (numUncommittedElements == commitThreshold) {
numUncommittedElements = 0;
storeFence = 1; // store barrier
LockSupport.unpark(t.get()); // t.get() load barrier
}
}
@Override
public E waitPoll(final Queue<E> q) throws InterruptedException {
E element = q.poll();
if (element != null) {
return element;
}
t.set(Thread.currentThread());
do {
LockSupport.parkNanos(timeoutInNs);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Interrupted while waiting for the queue to become non-empty.");
}
t.lazySet(null);
element = q.poll();
} while (element == null);
return element;
}
public int getCommitThreshold() {
return commitThreshold;
}
public long getTimeoutInMs() {
return timeoutInNs;
}
/**
* Thread-safe setter
*
* @param timeoutInMs
*/
public void setTimeoutInMs(final long timeoutInMs) {
this.timeoutInNs = timeoutInMs;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment