Skip to content
Snippets Groups Projects
Commit 6431b885 authored by Christian Wulf's avatar Christian Wulf
Browse files

removed connectPorts from IPipe

parent 624a9de1
No related branches found
No related tags found
No related merge requests found
...@@ -26,36 +26,30 @@ public abstract class AbstractPipe implements IPipe { ...@@ -26,36 +26,30 @@ public abstract class AbstractPipe implements IPipe {
* this.getPipe().getTargetPort().getOwningStage() * this.getPipe().getTargetPort().getOwningStage()
* </pre> * </pre>
*/ */
protected Stage cachedTargetStage; protected final Stage cachedTargetStage;
private InputPort<?> targetPort; private final InputPort<?> targetPort;
protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
this.targetPort = targetPort; if (sourcePort == null) {
if (null != targetPort) { // BETTER remove this check if migration is completed throw new IllegalArgumentException("sourcePort may not be null");
this.cachedTargetStage = targetPort.getOwningStage();
}
if (null != sourcePort) { // BETTER remove this check if migration is completed
sourcePort.setPipe(this);
} }
if (null != targetPort) { // BETTER remove this check if migration is completed if (targetPort == null) {
targetPort.setPipe(this); throw new IllegalArgumentException("targetPort may not be null");
} }
}
@Override
public InputPort<?> getTargetPort() {
return this.targetPort;
}
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this); sourcePort.setPipe(this);
targetPort.setPipe(this); targetPort.setPipe(this);
this.targetPort = targetPort; this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage(); this.cachedTargetStage = targetPort.getOwningStage();
} }
@Override
public InputPort<?> getTargetPort() {
return this.targetPort;
}
@Override @Override
public final boolean hasMore() { public final boolean hasMore() {
return !isEmpty(); return !isEmpty();
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package teetime.framework.pipe; package teetime.framework.pipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
/** /**
...@@ -25,11 +24,8 @@ import teetime.framework.signal.ISignal; ...@@ -25,11 +24,8 @@ import teetime.framework.signal.ISignal;
* @author Christian Wulf * @author Christian Wulf
* *
*/ */
@SuppressWarnings("rawtypes")
public final class DummyPipe implements IPipe { public final class DummyPipe implements IPipe {
public DummyPipe() {}
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
return true; return true;
...@@ -63,9 +59,6 @@ public final class DummyPipe implements IPipe { ...@@ -63,9 +59,6 @@ public final class DummyPipe implements IPipe {
@Override @Override
public void sendSignal(final ISignal signal) {} public void sendSignal(final ISignal signal) {}
@Override
public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
@Override @Override
public void reportNewElement() { public void reportNewElement() {
// do nothing // do nothing
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package teetime.framework.pipe; package teetime.framework.pipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
/** /**
...@@ -76,9 +75,6 @@ public interface IPipe { ...@@ -76,9 +75,6 @@ public interface IPipe {
*/ */
void sendSignal(ISignal signal); void sendSignal(ISignal signal);
@Deprecated
<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
/** /**
* Stages report new elements with this method. * Stages report new elements with this method.
*/ */
......
...@@ -21,7 +21,7 @@ import teetime.framework.signal.ISignal; ...@@ -21,7 +21,7 @@ import teetime.framework.signal.ISignal;
public class InstantiationPipe implements IPipe { public class InstantiationPipe implements IPipe {
private final InputPort target; private final InputPort<?> target;
private final int capacity; private final int capacity;
public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
...@@ -76,12 +76,6 @@ public class InstantiationPipe implements IPipe { ...@@ -76,12 +76,6 @@ public class InstantiationPipe implements IPipe {
} }
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
// TODO Auto-generated method stub
}
@Override @Override
public void reportNewElement() { public void reportNewElement() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
......
...@@ -27,12 +27,6 @@ final class SingleElementPipe extends AbstractIntraThreadPipe { ...@@ -27,12 +27,6 @@ final class SingleElementPipe extends AbstractIntraThreadPipe {
super(sourcePort, targetPort); super(sourcePort, targetPort);
} }
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
final IPipe pipe = new SingleElementPipe(null, null);
pipe.connectPorts(sourcePort, targetPort);
}
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
if (null == element) { if (null == element) {
......
...@@ -34,13 +34,6 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe ...@@ -34,13 +34,6 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
this.queue = new ObservableSpScArrayQueue<Object>(capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity);
} }
@Deprecated
public static <T> IMonitorablePipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
final SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity);
pipe.connectPorts(sourcePort, targetPort);
return pipe;
}
// BETTER introduce a QueueIsFullStrategy // BETTER introduce a QueueIsFullStrategy
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
......
...@@ -26,9 +26,9 @@ import teetime.framework.OutputPort; ...@@ -26,9 +26,9 @@ import teetime.framework.OutputPort;
* @param T * @param T
* the type of the input port and the output ports * the type of the input port and the output ports
*/ */
public final class Distributor<T> extends AbstractConsumerStage<T> { public class Distributor<T> extends AbstractConsumerStage<T> {
private IDistributorStrategy strategy; protected IDistributorStrategy strategy;
public Distributor() { public Distributor() {
this(new RoundRobinStrategy2()); this(new RoundRobinStrategy2());
......
...@@ -73,7 +73,7 @@ public class SpScPipeTest { ...@@ -73,7 +73,7 @@ public class SpScPipeTest {
assertEquals(signals, secondSignals); assertEquals(signals, secondSignals);
} }
@Test(expected = NullPointerException.class) @Test(expected = IllegalArgumentException.class)
public void testAdd() throws Exception { public void testAdd() throws Exception {
SpScPipe pipe = new SpScPipe(null, null, 4); SpScPipe pipe = new SpScPipe(null, null, 4);
assertFalse(pipe.add(null)); assertFalse(pipe.add(null));
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package teetime.stage.basic.merger; package teetime.stage.basic.merger;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
...@@ -79,11 +78,6 @@ class MergerTestingPipe implements IPipe { ...@@ -79,11 +78,6 @@ class MergerTestingPipe implements IPipe {
return null; return null;
} }
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
}
@Override @Override
public void reportNewElement() { public void reportNewElement() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment