Skip to content
Snippets Groups Projects
Commit 07abd2bb authored by Christian Wulf's avatar Christian Wulf
Browse files

added source port to all pipe implementations

parent 836da0ea
No related branches found
No related tags found
No related merge requests found
...@@ -28,6 +28,7 @@ public abstract class AbstractPipe implements IPipe { ...@@ -28,6 +28,7 @@ public abstract class AbstractPipe implements IPipe {
*/ */
protected final Stage cachedTargetStage; protected final Stage cachedTargetStage;
private final OutputPort<?> sourcePort;
private final InputPort<?> targetPort; private final InputPort<?> targetPort;
protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
...@@ -41,13 +42,19 @@ public abstract class AbstractPipe implements IPipe { ...@@ -41,13 +42,19 @@ public abstract class AbstractPipe implements IPipe {
sourcePort.setPipe(this); sourcePort.setPipe(this);
targetPort.setPipe(this); targetPort.setPipe(this);
this.sourcePort = sourcePort;
this.targetPort = targetPort; this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage(); this.cachedTargetStage = targetPort.getOwningStage();
} }
@Override
public OutputPort<?> getSourcePort() {
return sourcePort;
}
@Override @Override
public InputPort<?> getTargetPort() { public InputPort<?> getTargetPort() {
return this.targetPort; return targetPort;
} }
@Override @Override
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.framework.pipe; package teetime.framework.pipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
/** /**
...@@ -51,6 +52,11 @@ public final class DummyPipe implements IPipe { ...@@ -51,6 +52,11 @@ public final class DummyPipe implements IPipe {
return 0; return 0;
} }
@Override
public OutputPort<?> getSourcePort() {
return null;
}
@Override @Override
public InputPort<Object> getTargetPort() { public InputPort<Object> getTargetPort() {
return null; return null;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.framework.pipe; package teetime.framework.pipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
/** /**
...@@ -61,9 +62,12 @@ public interface IPipe { ...@@ -61,9 +62,12 @@ public interface IPipe {
Object removeLast(); Object removeLast();
/** /**
* Retrieves the receiving port. * @return the output port that is connected to the pipe.
* */
* @return InputPort which is connected to the pipe. OutputPort<?> getSourcePort();
/**
* @return the input port that is connected to the pipe.
*/ */
InputPort<?> getTargetPort(); InputPort<?> getTargetPort();
......
...@@ -23,10 +23,12 @@ public class InstantiationPipe implements IPipe { ...@@ -23,10 +23,12 @@ public class InstantiationPipe implements IPipe {
private static final String ERROR_MESSAGE = "This must not be called while executing the configuration"; private static final String ERROR_MESSAGE = "This must not be called while executing the configuration";
private final OutputPort<?> sourcePort;
private final InputPort<?> targetPort; private final InputPort<?> targetPort;
private final int capacity; private final int capacity;
public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { public <T> InstantiationPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
this.sourcePort = sourcePort;
this.targetPort = targetPort; this.targetPort = targetPort;
this.capacity = capacity; this.capacity = capacity;
sourcePort.setPipe(this); sourcePort.setPipe(this);
...@@ -37,9 +39,14 @@ public class InstantiationPipe implements IPipe { ...@@ -37,9 +39,14 @@ public class InstantiationPipe implements IPipe {
return capacity; return capacity;
} }
@Override
public OutputPort<?> getSourcePort() {
return sourcePort;
}
@Override @Override
public InputPort<?> getTargetPort() { public InputPort<?> getTargetPort() {
return this.targetPort; return targetPort;
} }
@Override @Override
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package teetime.stage.basic.merger; package teetime.stage.basic.merger;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
...@@ -73,6 +74,11 @@ class MergerTestingPipe implements IPipe { ...@@ -73,6 +74,11 @@ class MergerTestingPipe implements IPipe {
return null; return null;
} }
@Override
public OutputPort<?> getSourcePort() {
return null;
}
@Override @Override
public InputPort<?> getTargetPort() { public InputPort<?> getTargetPort() {
return null; return null;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment