diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index 02d74d49eee3273d64fb02b541d3afbc5049eacd..be4ed828fb87382299a64fd598b291a813aeeb5c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -31,7 +31,6 @@ public class InputPort<T> extends AbstractPort<T> { @Override public void setPipe(final IPipe pipe) { this.pipe = pipe; - pipe.setTargetPort(this); } public StageWithPort getOwningStage() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index e222c2570ba6b57814e6a4be83e014fcbc4f0e5d..d34fd86080e18e7936813bc8fdf0e6ee24606a82 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -17,21 +17,30 @@ public abstract class AbstractPipe implements IPipe { */ protected StageWithPort cachedTargetStage; - @Override - public InputPort<?> getTargetPort() { - return this.targetPort; + protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + this.targetPort = targetPort; + if (null != targetPort) { // BETTER remove this check if migration is completed + this.cachedTargetStage = targetPort.getOwningStage(); + } + if (null != sourcePort) { // BETTER remove this check if migration is completed + sourcePort.setPipe(this); + } + if (null != targetPort) { // BETTER remove this check if migration is completed + targetPort.setPipe(this); + } } @Override - public void setTargetPort(final InputPort<?> targetPort) { - this.targetPort = targetPort; - this.cachedTargetStage = targetPort.getOwningStage(); + public InputPort<?> getTargetPort() { + return this.targetPort; } @Override public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { sourcePort.setPipe(this); targetPort.setPipe(this); + this.targetPort = targetPort; + this.cachedTargetStage = targetPort.getOwningStage(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java index 9cf15055b71549c7f98a18c8cc9123128519b33f..2badc2ea6117cd94c8a2e840fd737e6bdcbe3ac8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java @@ -8,9 +8,13 @@ public final class CommittablePipe extends IntraThreadPipe { private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4); + <T> CommittablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + super(sourcePort, targetPort); + } + @Deprecated public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - IPipe pipe = new CommittablePipe(); + IPipe pipe = new CommittablePipe(null, null); pipe.connectPorts(sourcePort, targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java index 4c8c195f870b0e2f3c8b3e0d2a87ae3b515c142d..26888277d24a1dfa57d33c6c5a79936ec6849db7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java @@ -43,9 +43,6 @@ public final class DummyPipe implements IPipe { return null; } - @Override - public void setTargetPort(final InputPort targetPort) {} - @Override public void setSignal(final Signal signal) {} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 28c43332d65e180ffd41c6a4703366918c7e9442..a89a3c680616dcbbf035329ed61659f4b520f4fc 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -18,10 +18,9 @@ public interface IPipe { InputPort<?> getTargetPort(); - void setTargetPort(InputPort<?> targetPort); - void setSignal(Signal signal); + @Deprecated <T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); void reportNewElement(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java index 382fe5f2e038f948af3a56d58836f74215d3dd14..da4bbd44d6f4f3f7eb2c361eae579a92f296949f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java @@ -1,12 +1,19 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; public interface IPipeFactory { + @Deprecated IPipe create(int capacity); + <T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); + + <T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity); + ThreadCommunication getThreadCommunication(); PipeOrdering getOrdering(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java index 94c562ebbea24afecfc863ebf60cdf6abbbc1b8d..37ad702248c45a684fa71c3ad0ae6db0c2bd033f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java @@ -2,12 +2,18 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import java.util.concurrent.atomic.AtomicReference; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public abstract class InterThreadPipe extends AbstractPipe { private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); + <T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + super(sourcePort, targetPort); + } + @Override public void setSignal(final Signal signal) { this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java index 874663cc575d115f42b065c58146e775f9054d80..04857a9e656ccec1b58c4e60eabce32f90d3c8fd 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -1,12 +1,18 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public abstract class IntraThreadPipe extends AbstractPipe { + <T> IntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + super(sourcePort, targetPort); + } + @Override public void setSignal(final Signal signal) { - if (this.getTargetPort() != null) { + if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts this.cachedTargetStage.onSignal(signal, this.getTargetPort()); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index f1944529e0089eb9c54f7413c475864af7ca3885..e059acd4ccaffb831004703475c863593bab1d53 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -10,17 +10,14 @@ public final class OrderedGrowableArrayPipe extends IntraThreadPipe { private int head; private int tail; - public OrderedGrowableArrayPipe() { - this(1); - } - - public OrderedGrowableArrayPipe(final int initialCapacity) { - this.elements = new CircularArray<Object>(initialCapacity); + <T> OrderedGrowableArrayPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort); + this.elements = new CircularArray<Object>(capacity); } @Deprecated public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - IPipe pipe = new OrderedGrowableArrayPipe(); + IPipe pipe = new OrderedGrowableArrayPipe(sourcePort, targetPort, 4); pipe.connectPorts(sourcePort, targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java index 86a00ca10b19943c70750f150c2eaf802681b739..7290a7821ae5558b04e2b9675848875affd959c6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java @@ -1,16 +1,25 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; public class OrderedGrowableArrayPipeFactory implements IPipeFactory { - /** - * Hint: The capacity for this pipe implementation is ignored - */ @Override public IPipe create(final int capacity) { - return new OrderedGrowableArrayPipe(); + return create(null, null, capacity); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return create(sourcePort, targetPort, 4); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return new OrderedGrowableArrayPipe(sourcePort, targetPort, capacity); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index d63a5f86ff0864b2a76557e1900ff3d2b4f7d8b8..eab5df8f182e646d43d56c520b6c92467c9ebfc5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -9,17 +9,14 @@ public class OrderedGrowablePipe extends IntraThreadPipe { private final LinkedList<Object> elements; - public OrderedGrowablePipe() { - this(100000); - } - - public OrderedGrowablePipe(final int initialCapacity) { + <T> OrderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort); this.elements = new LinkedList<Object>(); } @Deprecated public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - IPipe pipe = new OrderedGrowablePipe(); + IPipe pipe = new OrderedGrowablePipe(null, null, 100000); pipe.connectPorts(sourcePort, targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java index d876a385f21e5457531d030546054935959f3d19..20fbec00a613e57386db382d9fec2b2d0295feac 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java @@ -55,16 +55,17 @@ public class PipeFactory { } public IPipe create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { + IPipeFactory pipeFactory = getPipeFactory(tc, ordering, growable); + return pipeFactory.create(capacity); + } + + public IPipeFactory getPipeFactory(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) { String key = this.buildKey(tc, ordering, growable); - IPipeFactory pipeClass = this.pipeFactories.get(key); - if (null == pipeClass) { + IPipeFactory pipeFactory = this.pipeFactories.get(key); + if (null == pipeFactory) { throw new CouldNotFindPipeImplException(key); } - return pipeClass.create(capacity); - } - - private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) { - return tc.toString() + ordering.toString() + growable; + return pipeFactory; } public void register(final IPipeFactory pipeFactory) { @@ -73,4 +74,7 @@ public class PipeFactory { LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName()); } + private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) { + return tc.toString() + ordering.toString() + growable; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java index bbd11753ade86bfe33c0841f9814359a3b67768b..26e8da0505d110b44b36265d66767a94f3908874 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java @@ -7,8 +7,8 @@ public final class RelayTestPipe<T> extends InterThreadPipe { private int numInputObjects; private final ConstructorClosure<T> inputObjectCreator; - public RelayTestPipe(final int numInputObjects, - final ConstructorClosure<T> inputObjectCreator) { + public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) { + super(null, null); this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index 73cba7744747b47df7f65e9c79195eb34cd4b052..f40eca65363d308b1a71d817e0843df351b9735d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -7,13 +7,13 @@ public final class SingleElementPipe extends IntraThreadPipe { private Object element; - SingleElementPipe() { - super(); + <T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + super(sourcePort, targetPort); } @Deprecated public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - IPipe pipe = new SingleElementPipe(); + IPipe pipe = new SingleElementPipe(null, null); pipe.connectPorts(sourcePort, targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java index fcbde3f4e79ef253ac99fe2fb8d43d75693d08a5..22309e35f5f3696a76b115e174266f2195b3bfdd 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java @@ -1,5 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -10,7 +12,20 @@ public class SingleElementPipeFactory implements IPipeFactory { */ @Override public IPipe create(final int capacity) { - return new SingleElementPipe(); + return create(null, null); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return create(sourcePort, targetPort, 1); + } + + /** + * Hint: The capacity for this pipe implementation is ignored + */ + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return new SingleElementPipe(sourcePort, targetPort); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 4ac039e4d5e542ca383322486074c264204ed3c7..acb1cc62026574ceb4e08a54e9147435b12c9a3a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -16,14 +16,14 @@ public final class SpScPipe extends InterThreadPipe { // statistics private int numWaits; - SpScPipe(final int capacity) { - ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT); - this.queue = QueueFactory.newQueue(concurrentQueueSpec); + <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort); + this.queue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT)); } @Deprecated public static <T> SpScPipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - SpScPipe pipe = new SpScPipe(capacity); + SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity); pipe.connectPorts(sourcePort, targetPort); return pipe; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java index d81ade9dce0de2bbeffdecb307088b6d5befef8b..fa0686e264ea2dfcf21249d8c7811d2f7c836039 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java @@ -1,5 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -7,7 +9,17 @@ public class SpScPipeFactory implements IPipeFactory { @Override public IPipe create(final int capacity) { - return new SpScPipe(capacity); + return create(null, null, capacity); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return create(sourcePort, targetPort, 4); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return new SpScPipe(sourcePort, targetPort, capacity); } @Override @@ -24,4 +36,5 @@ public class SpScPipeFactory implements IPipeFactory { public boolean isGrowable() { return false; } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index fbbd079ca4039c073df17b8cd04d37522da10a6d..878844e28a1073c5a544656a8f24781b8efd4c90 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -5,20 +5,18 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public final class UnorderedGrowablePipe extends IntraThreadPipe { - private final int MIN_CAPACITY; - private Object[] elements; // private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2); private int lastFreeIndex; - UnorderedGrowablePipe() { - this.MIN_CAPACITY = 4; - this.elements = new Object[this.MIN_CAPACITY]; + <T> UnorderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + super(sourcePort, targetPort); + this.elements = new Object[capacity]; } @Deprecated public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - IPipe pipe = new UnorderedGrowablePipe(); + IPipe pipe = new UnorderedGrowablePipe(null, null, 4); pipe.connectPorts(sourcePort, targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java index c6d1b87f10dae68bdf9162628cb4e8458b7f3259..536efc0ceee774f6652b3cd3f9a68524a27077d5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java @@ -1,5 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -10,7 +12,17 @@ public class UnorderedGrowablePipeFactory implements IPipeFactory { */ @Override public IPipe create(final int capacity) { - return new UnorderedGrowablePipe(); + return create(null, null, capacity); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + return create(sourcePort, targetPort, 4); + } + + @Override + public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + return new UnorderedGrowablePipe(sourcePort, targetPort, capacity); } @Override