From 6431b885deff9f8b43c062e5b2910f38f1d8dcfa Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Thu, 18 Jun 2015 12:24:17 +0200
Subject: [PATCH] removed connectPorts from IPipe

---
 .../java/teetime/framework/AbstractPipe.java  | 30 ++++++++-----------
 .../teetime/framework/pipe/DummyPipe.java     |  7 -----
 .../java/teetime/framework/pipe/IPipe.java    |  4 ---
 .../framework/pipe/InstantiationPipe.java     |  8 +----
 .../framework/pipe/SingleElementPipe.java     |  6 ----
 .../java/teetime/framework/pipe/SpScPipe.java |  7 -----
 .../stage/basic/distributor/Distributor.java  |  4 +--
 .../teetime/framework/pipe/SpScPipeTest.java  |  2 +-
 .../stage/basic/merger/MergerTestingPipe.java |  6 ----
 9 files changed, 16 insertions(+), 58 deletions(-)

diff --git a/src/main/java/teetime/framework/AbstractPipe.java b/src/main/java/teetime/framework/AbstractPipe.java
index 7485f8d4..58b0b2ee 100644
--- a/src/main/java/teetime/framework/AbstractPipe.java
+++ b/src/main/java/teetime/framework/AbstractPipe.java
@@ -26,36 +26,30 @@ public abstract class AbstractPipe implements IPipe {
 	 * this.getPipe().getTargetPort().getOwningStage()
 	 * </pre>
 	 */
-	protected Stage cachedTargetStage;
+	protected final Stage cachedTargetStage;
 
-	private InputPort<?> targetPort;
+	private final InputPort<?> targetPort;
 
 	protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		this.targetPort = targetPort;
-		if (null != targetPort) { // BETTER remove this check if migration is completed
-			this.cachedTargetStage = targetPort.getOwningStage();
-		}
-		if (null != sourcePort) { // BETTER remove this check if migration is completed
-			sourcePort.setPipe(this);
+		if (sourcePort == null) {
+			throw new IllegalArgumentException("sourcePort may not be null");
 		}
-		if (null != targetPort) { // BETTER remove this check if migration is completed
-			targetPort.setPipe(this);
+		if (targetPort == null) {
+			throw new IllegalArgumentException("targetPort may not be null");
 		}
-	}
-
-	@Override
-	public InputPort<?> getTargetPort() {
-		return this.targetPort;
-	}
 
-	@Override
-	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
 		sourcePort.setPipe(this);
 		targetPort.setPipe(this);
+
 		this.targetPort = targetPort;
 		this.cachedTargetStage = targetPort.getOwningStage();
 	}
 
+	@Override
+	public InputPort<?> getTargetPort() {
+		return this.targetPort;
+	}
+
 	@Override
 	public final boolean hasMore() {
 		return !isEmpty();
diff --git a/src/main/java/teetime/framework/pipe/DummyPipe.java b/src/main/java/teetime/framework/pipe/DummyPipe.java
index 2fe5d883..b5d914db 100644
--- a/src/main/java/teetime/framework/pipe/DummyPipe.java
+++ b/src/main/java/teetime/framework/pipe/DummyPipe.java
@@ -16,7 +16,6 @@
 package teetime.framework.pipe;
 
 import teetime.framework.InputPort;
-import teetime.framework.OutputPort;
 import teetime.framework.signal.ISignal;
 
 /**
@@ -25,11 +24,8 @@ import teetime.framework.signal.ISignal;
  * @author Christian Wulf
  *
  */
-@SuppressWarnings("rawtypes")
 public final class DummyPipe implements IPipe {
 
-	public DummyPipe() {}
-
 	@Override
 	public boolean add(final Object element) {
 		return true;
@@ -63,9 +59,6 @@ public final class DummyPipe implements IPipe {
 	@Override
 	public void sendSignal(final ISignal signal) {}
 
-	@Override
-	public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
-
 	@Override
 	public void reportNewElement() {
 		// do nothing
diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java
index 183aac72..0ff764be 100644
--- a/src/main/java/teetime/framework/pipe/IPipe.java
+++ b/src/main/java/teetime/framework/pipe/IPipe.java
@@ -16,7 +16,6 @@
 package teetime.framework.pipe;
 
 import teetime.framework.InputPort;
-import teetime.framework.OutputPort;
 import teetime.framework.signal.ISignal;
 
 /**
@@ -76,9 +75,6 @@ public interface IPipe {
 	 */
 	void sendSignal(ISignal signal);
 
-	@Deprecated
-	<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
-
 	/**
 	 * Stages report new elements with this method.
 	 */
diff --git a/src/main/java/teetime/framework/pipe/InstantiationPipe.java b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
index 702de988..7e5d4e0f 100644
--- a/src/main/java/teetime/framework/pipe/InstantiationPipe.java
+++ b/src/main/java/teetime/framework/pipe/InstantiationPipe.java
@@ -21,7 +21,7 @@ import teetime.framework.signal.ISignal;
 
 public class InstantiationPipe implements IPipe {
 
-	private final InputPort target;
+	private final InputPort<?> target;
 	private final int capacity;
 
 	public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
@@ -76,12 +76,6 @@ public class InstantiationPipe implements IPipe {
 
 	}
 
-	@Override
-	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		// TODO Auto-generated method stub
-
-	}
-
 	@Override
 	public void reportNewElement() {
 		// TODO Auto-generated method stub
diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
index 200d73be..c3363894 100644
--- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java
+++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java
@@ -27,12 +27,6 @@ final class SingleElementPipe extends AbstractIntraThreadPipe {
 		super(sourcePort, targetPort);
 	}
 
-	@Deprecated
-	public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-		final IPipe pipe = new SingleElementPipe(null, null);
-		pipe.connectPorts(sourcePort, targetPort);
-	}
-
 	@Override
 	public boolean add(final Object element) {
 		if (null == element) {
diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java
index 7d68aafb..d515bcd5 100644
--- a/src/main/java/teetime/framework/pipe/SpScPipe.java
+++ b/src/main/java/teetime/framework/pipe/SpScPipe.java
@@ -34,13 +34,6 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
 		this.queue = new ObservableSpScArrayQueue<Object>(capacity);
 	}
 
-	@Deprecated
-	public static <T> IMonitorablePipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
-		final SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity);
-		pipe.connectPorts(sourcePort, targetPort);
-		return pipe;
-	}
-
 	// BETTER introduce a QueueIsFullStrategy
 	@Override
 	public boolean add(final Object element) {
diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java
index ab19ba48..485fb876 100644
--- a/src/main/java/teetime/stage/basic/distributor/Distributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java
@@ -26,9 +26,9 @@ import teetime.framework.OutputPort;
  * @param T
  *            the type of the input port and the output ports
  */
-public final class Distributor<T> extends AbstractConsumerStage<T> {
+public class Distributor<T> extends AbstractConsumerStage<T> {
 
-	private IDistributorStrategy strategy;
+	protected IDistributorStrategy strategy;
 
 	public Distributor() {
 		this(new RoundRobinStrategy2());
diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java
index c7a2db5b..3a1e76b1 100644
--- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java
+++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java
@@ -73,7 +73,7 @@ public class SpScPipeTest {
 		assertEquals(signals, secondSignals);
 	}
 
-	@Test(expected = NullPointerException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testAdd() throws Exception {
 		SpScPipe pipe = new SpScPipe(null, null, 4);
 		assertFalse(pipe.add(null));
diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
index 51c82d81..ea411711 100644
--- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
+++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java
@@ -16,7 +16,6 @@
 package teetime.stage.basic.merger;
 
 import teetime.framework.InputPort;
-import teetime.framework.OutputPort;
 import teetime.framework.pipe.IPipe;
 import teetime.framework.signal.ISignal;
 import teetime.framework.signal.StartingSignal;
@@ -79,11 +78,6 @@ class MergerTestingPipe implements IPipe {
 		return null;
 	}
 
-	@Override
-	public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
-
-	}
-
 	@Override
 	public void reportNewElement() {
 
-- 
GitLab