diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java index 7485f8d45b6796a5f94bf853d526cbb009f8da8f..58b0b2eecd1fe2454383da67666682e88b016643 100644 --- a/src/main/java/teetime/framework/AbstractPipe.java +++ b/src/main/java/teetime/framework/AbstractPipe.java @@ -26,36 +26,30 @@ public abstract class AbstractPipe implements IPipe { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - protected Stage cachedTargetStage; + protected final Stage cachedTargetStage; - private InputPort<?> targetPort; + private final InputPort<?> targetPort; protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - this.targetPort = targetPort; - if (null != targetPort) { // BETTER remove this check if migration is completed - this.cachedTargetStage = targetPort.getOwningStage(); - } - if (null != sourcePort) { // BETTER remove this check if migration is completed - sourcePort.setPipe(this); + if (sourcePort == null) { + throw new IllegalArgumentException("sourcePort may not be null"); } - if (null != targetPort) { // BETTER remove this check if migration is completed - targetPort.setPipe(this); + if (targetPort == null) { + throw new IllegalArgumentException("targetPort may not be null"); } - } - - @Override - public InputPort<?> getTargetPort() { - return this.targetPort; - } - @Override - public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { sourcePort.setPipe(this); targetPort.setPipe(this); + this.targetPort = targetPort; this.cachedTargetStage = targetPort.getOwningStage(); } + @Override + public InputPort<?> getTargetPort() { + return this.targetPort; + } + @Override public final boolean hasMore() { return !isEmpty(); diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java index 2fe5d8833e237931be44dc2aa69ab3f3b4d8606b..b5d914db8917b763936fc5b52b7a1badbcfe4a7e 100644 --- a/src/main/java/teetime/framework/pipe/DummyPipe.java +++ b/src/main/java/teetime/framework/pipe/DummyPipe.java @@ -16,7 +16,6 @@ package teetime.framework.pipe; import teetime.framework.InputPort; -import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; /** @@ -25,11 +24,8 @@ import teetime.framework.signal.ISignal; * @author Christian Wulf * */ -@SuppressWarnings("rawtypes") public final class DummyPipe implements IPipe { - public DummyPipe() {} - @Override public boolean add(final Object element) { return true; @@ -63,9 +59,6 @@ public final class DummyPipe implements IPipe { @Override public void sendSignal(final ISignal signal) {} - @Override - public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} - @Override public void reportNewElement() { // do nothing diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 183aac72c8b50343ca9939dd1daf7ae7b4aa6bf7..0ff764be04028bf96076ab1d6db19f2de71a7372 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -16,7 +16,6 @@ package teetime.framework.pipe; import teetime.framework.InputPort; -import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; /** @@ -76,9 +75,6 @@ public interface IPipe { */ void sendSignal(ISignal signal); - @Deprecated - <T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); - /** * Stages report new elements with this method. */ diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java index 04b181455ad0fe386dbe958586bdc412381d0f39..64e6dbf21689280e9f0c9e95444374e2aefde725 100644 --- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java +++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java @@ -21,7 +21,7 @@ import teetime.framework.signal.ISignal; public class InstantiationPipe implements IPipe { - private final InputPort target; + private final InputPort<?> target; private final int capacity; public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { @@ -69,11 +69,6 @@ public class InstantiationPipe implements IPipe { throw new IllegalStateException("This must not be called while executing the configuration"); } - @Override - public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - throw new IllegalStateException("This must not be called while executing the configuration"); - } - @Override public void reportNewElement() { throw new IllegalStateException("This must not be called while executing the configuration"); diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java index 200d73be77a0b39bcd7c50cc882ee5e5115ed795..c33638940f4c91cef128686e52fa17d8c1d27fae 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java @@ -27,12 +27,6 @@ final class SingleElementPipe extends AbstractIntraThreadPipe { super(sourcePort, targetPort); } - @Deprecated - public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - final IPipe pipe = new SingleElementPipe(null, null); - pipe.connectPorts(sourcePort, targetPort); - } - @Override public boolean add(final Object element) { if (null == element) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 7d68aafb9af1ffdefcb242f1eb1d042bcfe493d3..d515bcd580bbd09d342a5ebd8f24cf46cbbc47c1 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -34,13 +34,6 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe this.queue = new ObservableSpScArrayQueue<Object>(capacity); } - @Deprecated - public static <T> IMonitorablePipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - final SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity); - pipe.connectPorts(sourcePort, targetPort); - return pipe; - } - // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index ab19ba485881843df71f11530d24329101aa438b..485fb876e81f17fb95ac99e6addf9a8c9e1b3f93 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -26,9 +26,9 @@ import teetime.framework.OutputPort; * @param T * the type of the input port and the output ports */ -public final class Distributor<T> extends AbstractConsumerStage<T> { +public class Distributor<T> extends AbstractConsumerStage<T> { - private IDistributorStrategy strategy; + protected IDistributorStrategy strategy; public Distributor() { this(new RoundRobinStrategy2()); diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index c7a2db5b4ddd36feee67a09fb80d84f1a57559da..3a1e76b11424fd2b40fc0071c48411a1f2da192b 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -73,7 +73,7 @@ public class SpScPipeTest { assertEquals(signals, secondSignals); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { SpScPipe pipe = new SpScPipe(null, null, 4); assertFalse(pipe.add(null)); diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 51c82d813ae7f7f70674160f60e9cc998fa124c8..ea411711ff8dd44680a2afc831c099041c734993 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -16,7 +16,6 @@ package teetime.stage.basic.merger; import teetime.framework.InputPort; -import teetime.framework.OutputPort; import teetime.framework.pipe.IPipe; import teetime.framework.signal.ISignal; import teetime.framework.signal.StartingSignal; @@ -79,11 +78,6 @@ class MergerTestingPipe implements IPipe { return null; } - @Override - public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - - } - @Override public void reportNewElement() {