From 0fba96d7996ceb5d5c42f88b1dd461dbdbdceacc Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Thu, 16 Oct 2014 15:25:50 +0200 Subject: [PATCH] changed signal field to a ConcurrentQueue --- .../teetime/framework/pipe/InterThreadPipe.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/java/teetime/framework/pipe/InterThreadPipe.java b/src/main/java/teetime/framework/pipe/InterThreadPipe.java index 377b01b1..0825c280 100644 --- a/src/main/java/teetime/framework/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/InterThreadPipe.java @@ -1,6 +1,11 @@ package teetime.framework.pipe; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Queue; + +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -8,7 +13,7 @@ import teetime.framework.signal.ISignal; public abstract class InterThreadPipe extends AbstractPipe { - private final AtomicReference<ISignal> signal = new AtomicReference<ISignal>(); + private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));; <T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort); @@ -16,11 +21,11 @@ public abstract class InterThreadPipe extends AbstractPipe { @Override public void setSignal(final ISignal signal) { - this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + this.signalQueue.offer(signal); } public ISignal getSignal() { - return this.signal.get(); + return this.signalQueue.poll(); } @Override -- GitLab