diff --git a/src/main/java/teetime/framework/OutputPort.java b/src/main/java/teetime/framework/OutputPort.java index e53b5aff33ed13d841d4fee88a175ef5610827ae..45a8de638a765f72d408dc5e84f856feac624ab6 100644 --- a/src/main/java/teetime/framework/OutputPort.java +++ b/src/main/java/teetime/framework/OutputPort.java @@ -18,7 +18,7 @@ public final class OutputPort<T> extends AbstractPort<T> { } public void sendSignal(final ISignal signal) { - this.pipe.setSignal(signal); + this.pipe.sendSignal(signal); } public void reportNewElement() { diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 98629184ddedf7555193be0167ec371b018a6427..716046dc3242f4f5526bddb163b45bb97ba4fcca 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -44,7 +44,7 @@ public final class DummyPipe implements IPipe { } @Override - public void setSignal(final ISignal signal) {} + public void sendSignal(final ISignal signal) {} @Override public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 1e9d2f4399430b35c7eb8e0efeb2a089529058b7..8fee03e759208e92c3d7fef5d2c04414297ec804 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -18,7 +18,7 @@ public interface IPipe { InputPort<?> getTargetPort(); - void setSignal(ISignal signal); + void sendSignal(ISignal signal); @Deprecated <T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); diff --git a/src/main/java/teetime/framework/pipe/InterThreadPipe.java b/src/main/java/teetime/framework/pipe/InterThreadPipe.java index 377b01b181eda99dad0c09a9633f817d1fc7da8e..a38e59519e8e3a16c2027fb0b14ff757273b75a3 100644 --- a/src/main/java/teetime/framework/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/InterThreadPipe.java @@ -1,6 +1,11 @@ package teetime.framework.pipe; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Queue; + +import org.jctools.queues.QueueFactory; +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.jctools.queues.spec.Preference; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -8,19 +13,24 @@ import teetime.framework.signal.ISignal; public abstract class InterThreadPipe extends AbstractPipe { - private final AtomicReference<ISignal> signal = new AtomicReference<ISignal>(); + private final Queue<ISignal> signalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); <T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort); } @Override - public void setSignal(final ISignal signal) { - this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + public void sendSignal(final ISignal signal) { + this.signalQueue.offer(signal); } + /** + * Retrieves and removes the head of the signal queue + * + * @return Head of signal queue, <code>null</code> if signal queue is empty. + */ public ISignal getSignal() { - return this.signal.get(); + return this.signalQueue.poll(); } @Override diff --git a/src/main/java/teetime/framework/pipe/IntraThreadPipe.java b/src/main/java/teetime/framework/pipe/IntraThreadPipe.java index 27a7aa2adae1b0755acf2a6e0b5193df066c7974..42257bc9085c363c0604c4a60cf093e88f9f2bcf 100644 --- a/src/main/java/teetime/framework/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/framework/pipe/IntraThreadPipe.java @@ -11,7 +11,7 @@ public abstract class IntraThreadPipe extends AbstractPipe { } @Override - public void setSignal(final ISignal signal) { + public void sendSignal(final ISignal signal) { if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts this.cachedTargetStage.onSignal(signal, this.getTargetPort()); } diff --git a/src/main/java/teetime/stage/IterableProducer.java b/src/main/java/teetime/stage/IterableProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..7d9eadde0d7e7d686de39a6f0378acd776afb8d8 --- /dev/null +++ b/src/main/java/teetime/stage/IterableProducer.java @@ -0,0 +1,21 @@ +package teetime.stage; + +import teetime.framework.ProducerStage; + +public class IterableProducer<O extends Iterable<T>, T> extends ProducerStage<T> { + + private O iter = null; + + public IterableProducer(final O iter) { + this.iter = iter; + } + + @Override + protected void execute() { + for (T i : iter) { + this.send(this.outputPort, i); + } + + } + +} diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java index a6464885ebe837df536e5697e20e9dde4d0a6bc5..b61ed4dabf0fb502c0e3d9177eb397744a2cf88d 100644 --- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -149,7 +149,7 @@ public class MethodCallThroughputAnalysis17 { final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); - startPipe.setSignal(new TerminatingSignal()); + startPipe.sendSignal(new TerminatingSignal()); relay.getInputPort().setPipe(startPipe); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1a937afc2b63f77a2cb4fc36398d69e774566b97 --- /dev/null +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -0,0 +1,49 @@ +package teetime.framework.pipe; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.signal.ISignal; +import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.TerminatingSignal; +import teetime.framework.signal.ValidatingSignal; + +public class SpScPipeTest { + + @Test + public void testSignalOrdering() throws Exception { + OutputPort<? extends Object> sourcePort = null; + InputPort<Object> targetPort = null; + InterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method + + List<ISignal> list = new ArrayList<ISignal>(); + list.add(new StartingSignal()); + list.add(new TerminatingSignal()); + list.add(new ValidatingSignal()); + list.add(new StartingSignal()); + list.add(new TerminatingSignal()); + list.add(new ValidatingSignal()); + list.add(new StartingSignal()); + list.add(new TerminatingSignal()); + list.add(new ValidatingSignal()); + + for (ISignal s : list) { + pipe.sendSignal(s); + } + + List<ISignal> secondList = new ArrayList<ISignal>(); + while (true) { + ISignal temp = pipe.getSignal(); + if (temp == null) { + break; + } + secondList.add(temp); + } + Assert.assertEquals(list, secondList); + } +}