diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 3336d636266bc92c6d74d5caefaef85e5f69814c..8786ef85ebfb03214beea7f40ff2fcd2758aa983 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -38,8 +38,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { private volatile boolean closed; - protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - super(sourcePort, targetPort); + protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort, capacity); final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>(); final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 8108cd5d85adff6f127f4f01134911b72acbcafe..a12acf711a938fdc861784501f0806404735c27c 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -21,8 +21,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { private boolean closed; - protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - super(sourcePort, targetPort); + protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort, capacity); } @Override diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 19df8e3a096b218a77146cea1b94f68916896478..2a6ea3bb595aa6031161e1cc6cd22f518c5592c5 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -30,8 +30,10 @@ public abstract class AbstractPipe implements IPipe { private final OutputPort<?> sourcePort; private final InputPort<?> targetPort; + @SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName") + private final int capacity; - protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort == null) { throw new IllegalArgumentException("sourcePort may not be null"); } @@ -44,16 +46,17 @@ public abstract class AbstractPipe implements IPipe { this.sourcePort = sourcePort; this.targetPort = targetPort; + this.capacity = capacity; this.cachedTargetStage = targetPort.getOwningStage(); } @Override - public OutputPort<?> getSourcePort() { + public final OutputPort<?> getSourcePort() { return sourcePort; } @Override - public InputPort<?> getTargetPort() { + public final InputPort<?> getTargetPort() { return targetPort; } @@ -61,4 +64,9 @@ public abstract class AbstractPipe implements IPipe { public final boolean hasMore() { return !isEmpty(); } + + @Override + public final int capacity() { + return capacity; + } } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index cce003efbed341c2de6266cbdaf4a83955fb7518..63a57c3d9dc2e8e7840d7a0105aecdb4f3796041 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -72,8 +72,8 @@ class ExecutionInstantiation { } if (threadableStages.contains(targetStage) && targetColor != color) { - if (pipe.getCapacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); + if (pipe.capacity() != 0) { + interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); } diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index c67015f1c4f5ebf6293016429e52ddeaf78b3e07..8381869ed80adbb757d11ca469a4d733ae2852f4 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -95,4 +95,9 @@ public final class DummyPipe implements IPipe { } + @Override + public int capacity() { + return 0; + } + } diff --git a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java index 0ca5d9c3451429d19129e016c10afc8102be62d0..db61ad140c096e8238b24615e7414963d5583fd8 100644 --- a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java +++ b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java @@ -23,6 +23,8 @@ public interface IMonitorablePipe { int size(); + int capacity(); + long getPushThroughput(); long getPullThroughput(); diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index a28a3419b4c9bee5352d20ab0c7169ec32c2d940..de1cd908e6b6d1c9b66a73242a0f8b58f8a80a5b 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -50,10 +50,15 @@ public interface IPipe { boolean isEmpty(); /** - * @return the current number of elements + * @return the current number of elements held by this pipe instance */ int size(); + /** + * @return the maximum number of elements possible to hold by this pipe instance + */ + int capacity(); + /** * Retrieves the last element of the pipe and deletes it. * diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index de753e9e47791848a41b1ad894a4ecc19b621e02..ab02f62d222e86e85591568f347b81ea46f65b2a 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -35,7 +35,8 @@ public class InstantiationPipe implements IPipe { targetPort.setPipe(this); } - public int getCapacity() { + @Override + public int capacity() { return capacity; } diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 1922e40dd0bfb8de3c399c21cb4b98f20597d145..20e99adbc8cf1b3da8da9baa4a9464057f0fadc5 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -24,7 +24,7 @@ final class RelayTestPipe<T> extends AbstractInterThreadPipe { private final ConstructorClosure<T> inputObjectCreator; public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) { - super(null, null); + super(null, null, Integer.MAX_VALUE); this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; } diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java index 108a01f87d059176dfc31de2384606a26d4305c2..655b9b5f1b232e7dd1b4d51fc45a90800d30f64d 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java @@ -24,7 +24,7 @@ final class SingleElementPipe extends AbstractIntraThreadPipe { private Object element; <T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - super(sourcePort, targetPort); + super(sourcePort, targetPort, 1); } @Override diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 6fc9c350cda259c95b618fa6561af6e43932e82b..eb58b9e068777a71bd1c8cd490c338662117ea86 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -30,7 +30,7 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe private int numWaits; <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - super(sourcePort, targetPort); + super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index 3789144621fe5bfaaa6dd0fa9fda4c86e7637a9e..954c5918a5219e2f706d94850783af21f44a802a 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -31,7 +31,7 @@ final class UnboundedSpScPipe extends AbstractInterThreadPipe { private final Queue<Object> queue; <T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - super(sourcePort, targetPort); + super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index ad77ffde99e487473817ea1368ad3dbd045a0bbe..62b2d97b49eebce2ef8a66be1b1a0efb4b6d258d 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -69,6 +69,11 @@ class MergerTestingPipe implements IPipe { return 0; } + @Override + public int capacity() { + return 0; + } + @Override public Object removeLast() { return null;