diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 83c2a52faaff85c3697de2b66e3f886f882791b9..84828cf0256a228e0372318e98028fe081417f9b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -53,6 +53,7 @@ public abstract class AbstractStage implements StageWithPort { outputPort.reportNewElement(); return true; + // return outputPort.send(element); } @SuppressWarnings("unchecked") diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java similarity index 91% rename from src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java rename to src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java index 2dd45c930656c604aa28426dbf55b33a3211d10c..5490feb101c239ba6ae526b9bfe338ceb862bf4b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java @@ -4,19 +4,19 @@ import teetime.util.list.CommittableResizableArrayQueue; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Pipe<T> extends IntraThreadPipe<T> { +public final class CommittablePipe<T> extends IntraThreadPipe<T> { private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new Pipe<T>(); + IPipe<T> pipe = new CommittablePipe<T>(); pipe.connectPorts(sourcePort, targetPort); } /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#add(T) */ @Override @@ -28,7 +28,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#removeLast() */ @Override @@ -40,7 +40,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#isEmpty() */ @Override @@ -50,7 +50,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#readLast() */ @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..abc3ff3b4d40b13c77ef927812afa59f716ab5ab --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java @@ -0,0 +1,24 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import java.util.concurrent.atomic.AtomicReference; + +import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; + +public abstract class InterThreadPipe<T> extends AbstractPipe<T> { + + private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); + + @Override + public void setSignal(final Signal signal) { + this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement + } + + public Signal getSignal() { + return this.signal.get(); + } + + @Override + public void reportNewElement() { + // do nothing + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index 7a6b4aa0019fb32fc0b2acce7fc0e71a7df9a9dc..86f2b99102a205c3fecdd43bd9ffe2f9766388f4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -4,7 +4,7 @@ import teetime.util.concurrent.workstealing.CircularArray; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { +public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { private CircularArray<T> elements; private int head; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..d7ab5e83c715369c5e36cc7854cb7360ae589131 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java @@ -0,0 +1,46 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.util.ConstructorClosure; + +public final class RelayTestPipe<T> extends InterThreadPipe<T> { + + private int numInputObjects; + private final ConstructorClosure<T> inputObjectCreator; + + public RelayTestPipe(final int numInputObjects, + final ConstructorClosure<T> inputObjectCreator) { + this.numInputObjects = numInputObjects; + this.inputObjectCreator = inputObjectCreator; + } + + @Override + public boolean add(final T element) { + return false; + } + + @Override + public T removeLast() { + if (this.numInputObjects == 0) { + return null; + } else { + this.numInputObjects--; + return this.inputObjectCreator.create(); + } + } + + @Override + public boolean isEmpty() { + return (this.numInputObjects == 0); + } + + @Override + public int size() { + return this.numInputObjects; + } + + @Override + public T readLast() { + return null; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index 96eeba861d7374fc668a0f76984156a29c23bedd..1d48809f6574a3ea7ea696065159cb7e7804c993 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -3,10 +3,14 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class SingleElementPipe<T> extends IntraThreadPipe<T> { +public final class SingleElementPipe<T> extends IntraThreadPipe<T> { private T element; + SingleElementPipe() { + super(); + } + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new SingleElementPipe<T>(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 9153edaf0a8a47fff893c6aec7ec13f67f98b435..4999edef4501b2265ab347217ee4d83d74583364 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -1,7 +1,6 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; import org.jctools.queues.QueueFactory; import org.jctools.queues.spec.ConcurrentQueueSpec; @@ -10,12 +9,10 @@ import org.jctools.queues.spec.Preference; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -public class SpScPipe<T> extends AbstractPipe<T> { +public final class SpScPipe<T> extends InterThreadPipe<T> { private final Queue<T> queue; - private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); // statistics private int numWaits; @@ -67,18 +64,4 @@ public class SpScPipe<T> extends AbstractPipe<T> { return this.numWaits; } - @Override - public void setSignal(final Signal signal) { - this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement - } - - public Signal getSignal() { - return this.signal.get(); - } - - @Override - public void reportNewElement() { - // do nothing - } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 3befc87d87b8b09f6a107de743eb9f5c3915e171..357762b45c5f3f3bafbd47e8bd1ac43218f2395e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -3,7 +3,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { +public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { private final int MIN_CAPACITY; @@ -12,7 +12,7 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { private int lastFreeIndex; @SuppressWarnings("unchecked") - public UnorderedGrowablePipe() { + UnorderedGrowablePipe() { this.MIN_CAPACITY = 4; this.elements = (T[]) new Object[this.MIN_CAPACITY]; } @@ -23,12 +23,6 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { pipe.connectPorts(sourcePort, targetPort); } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - sourcePort.setPipe(this); - targetPort.setPipe(this); - } - @Override public boolean add(final T element) { if (this.lastFreeIndex == this.elements.length) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index d31041fd342afe845c28c0f5d72d010b38eefbd1..2c525299c1e31c5d31be3c96416dc6ec7da3da83 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -2,14 +2,14 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.InterThreadPipe; import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; public class Relay<T> extends ProducerStage<T> { private final InputPort<T> inputPort = this.createInputPort(); - private SpScPipe<T> cachedCastedInputPipe; + private InterThreadPipe<T> cachedCastedInputPipe; @Override public void execute() { @@ -26,7 +26,7 @@ public class Relay<T> extends ProducerStage<T> { @Override public void onStarting() { - this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe(); + this.cachedCastedInputPipe = (InterThreadPipe<T>) this.inputPort.getPipe(); super.onStarting(); } diff --git a/src/main/java/util/PerformanceTest.java b/src/main/java/util/PerformanceTest.java index 0887ae91af09f5bcf3ecdb3cbcb007b7f0973b60..7438d476ee923d5ede2e2aa13a5e8104049d665c 100644 --- a/src/main/java/util/PerformanceTest.java +++ b/src/main/java/util/PerformanceTest.java @@ -17,7 +17,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; public abstract class PerformanceTest { - protected static final int NUM_OBJECTS_TO_CREATE = 100000; + protected static final int NUM_OBJECTS_TO_CREATE = 1000000; protected static final int NUM_NOOP_FILTERS = 800; public static final MeasurementRepository measurementRepository = new MeasurementRepository(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index f55156f57aece8215af8aeb5e73968c192c9b451..117954b87f57dbc4c10ede61dba0ee7bf94ccb2d 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -23,7 +23,7 @@ import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.CommittablePipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; @@ -71,13 +71,13 @@ public class MethodCallThroughputAnalysis9 extends Analysis { pipeline.setFirstStage(objectProducer); pipeline.setLastStage(collectorSink); - Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + CommittablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + CommittablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - Pipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + CommittablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + CommittablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + CommittablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java index 8c73c4152ff25133a972320e013a73b161edb8b7..8e3fe93b52879c0cd8103da5fa1e393258cc500b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java @@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile { System.out.println("speedupC: " + speedupC); assertEquals(2, speedupB, 0.3); - assertEquals(3.5, speedupC, 0.3); + assertEquals(3.6, speedupC, 0.3); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index 1cf3daf788819070136e315b1fdcf93dbca4247e..7d662b6ae47d9ab629af537b606ab1c2978ea0a9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -25,10 +25,10 @@ import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; -import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; -import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; +import teetime.variant.methodcallWithPorts.framework.core.pipe.RelayTestPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -110,6 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { super.init(); } + @SuppressWarnings("unchecked") private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); @@ -117,21 +118,19 @@ public class MethodCallThroughputAnalysis17 extends Analysis { Sink<TimestampObject> sink = new Sink<TimestampObject>(); Sink<Void> endStage = new Sink<Void>(); + // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort()); + // objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); + + UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); + distributor.getNewOutputPort().setPipe(new DummyPipe()); + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); // pipeline.setFirstStage(sink); // pipeline.setFirstStage(endStage); - pipeline.setLastStage(distributor); // pipeline.setLastStage(sink); // pipeline.setLastStage(new EndStage<TimestampObject>()); - - // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort()); - // objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); - - UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); - distributor.getNewOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>()); - return pipeline; } @@ -152,21 +151,11 @@ public class MethodCallThroughputAnalysis17 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); - pipeline.setFirstStage(relay); - pipeline.setLastStage(collectorSink); - - IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false, SPSC_INITIAL_CAPACITY); - relay.getInputPort().setPipe(pipe); - IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe(); - for (int i = 0; i < this.numInputObjects; i++) { - startPipe.add(this.inputObjectCreator.create()); - } - // startPipe.close(); + IPipe<TimestampObject> startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); startPipe.setSignal(new TerminatingSignal()); + relay.getInputPort().setPipe(startPipe); UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); @@ -174,6 +163,9 @@ public class MethodCallThroughputAnalysis17 extends Analysis { UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); + pipeline.setFirstStage(relay); + pipeline.setLastStage(collectorSink); return pipeline; }