diff --git a/src/main/java/teetime/framework/pipe/InterThreadPipe.java b/src/main/java/teetime/framework/pipe/InterThreadPipe.java index 377b01b181eda99dad0c09a9633f817d1fc7da8e..0825c2809b8d72590c4f3852a80b825b87f44124 100644 --- a/src/main/java/teetime/framework/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/InterThreadPipe.java @@ -1,6 +1,11 @@ package teetime.framework.pipe; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Queue; + +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -8,7 +13,7 @@ import teetime.framework.signal.ISignal; public abstract class InterThreadPipe extends AbstractPipe { - private final AtomicReference<ISignal> signal = new AtomicReference<ISignal>(); + private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));; <T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort); @@ -16,11 +21,11 @@ public abstract class InterThreadPipe extends AbstractPipe { @Override public void setSignal(final ISignal signal) { - this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + this.signalQueue.offer(signal); } public ISignal getSignal() { - return this.signal.get(); + return this.signalQueue.poll(); } @Override