Skip to content
Snippets Groups Projects
Commit fb333777 authored by Christian Wulf's avatar Christian Wulf
Browse files

worked on setting attributes in AbstractStage to final

parent c57b642b
No related branches found
No related tags found
No related merge requests found
Showing
with 128 additions and 56 deletions
......@@ -31,7 +31,6 @@ public class InputPort<T> extends AbstractPort<T> {
@Override
public void setPipe(final IPipe pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
}
public StageWithPort getOwningStage() {
......
......@@ -17,21 +17,30 @@ public abstract class AbstractPipe implements IPipe {
*/
protected StageWithPort cachedTargetStage;
@Override
public InputPort<?> getTargetPort() {
return this.targetPort;
protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
this.targetPort = targetPort;
if (null != targetPort) { // BETTER remove this check if migration is completed
this.cachedTargetStage = targetPort.getOwningStage();
}
if (null != sourcePort) { // BETTER remove this check if migration is completed
sourcePort.setPipe(this);
}
if (null != targetPort) { // BETTER remove this check if migration is completed
targetPort.setPipe(this);
}
}
@Override
public void setTargetPort(final InputPort<?> targetPort) {
this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage();
public InputPort<?> getTargetPort() {
return this.targetPort;
}
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage();
}
}
......@@ -8,9 +8,13 @@ public final class CommittablePipe extends IntraThreadPipe {
private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4);
<T> CommittablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new CommittablePipe();
IPipe pipe = new CommittablePipe(null, null);
pipe.connectPorts(sourcePort, targetPort);
}
......
......@@ -43,9 +43,6 @@ public final class DummyPipe implements IPipe {
return null;
}
@Override
public void setTargetPort(final InputPort targetPort) {}
@Override
public void setSignal(final Signal signal) {}
......
......@@ -18,10 +18,9 @@ public interface IPipe {
InputPort<?> getTargetPort();
void setTargetPort(InputPort<?> targetPort);
void setSignal(Signal signal);
@Deprecated
<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
void reportNewElement();
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public interface IPipeFactory {
@Deprecated
IPipe create(int capacity);
<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
<T> IPipe create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
ThreadCommunication getThreadCommunication();
PipeOrdering getOrdering();
......
......@@ -2,12 +2,18 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicReference;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class InterThreadPipe extends AbstractPipe {
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
<T> InterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Override
public void setSignal(final Signal signal) {
this.signal.lazySet(signal); // lazySet is legal due to our single-writer requirement
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class IntraThreadPipe extends AbstractPipe {
<T> IntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Override
public void setSignal(final Signal signal) {
if (this.getTargetPort() != null) {
if (this.getTargetPort() != null) { // BETTER remove this check since there are DummyPorts
this.cachedTargetStage.onSignal(signal, this.getTargetPort());
}
}
......
......@@ -10,17 +10,14 @@ public final class OrderedGrowableArrayPipe extends IntraThreadPipe {
private int head;
private int tail;
public OrderedGrowableArrayPipe() {
this(1);
}
public OrderedGrowableArrayPipe(final int initialCapacity) {
this.elements = new CircularArray<Object>(initialCapacity);
<T> OrderedGrowableArrayPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new CircularArray<Object>(capacity);
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new OrderedGrowableArrayPipe();
IPipe pipe = new OrderedGrowableArrayPipe(sourcePort, targetPort, 4);
pipe.connectPorts(sourcePort, targetPort);
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
/**
* Hint: The capacity for this pipe implementation is ignored
*/
@Override
public IPipe create(final int capacity) {
return new OrderedGrowableArrayPipe();
return create(null, null, capacity);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new OrderedGrowableArrayPipe(sourcePort, targetPort, capacity);
}
@Override
......
......@@ -9,17 +9,14 @@ public class OrderedGrowablePipe extends IntraThreadPipe {
private final LinkedList<Object> elements;
public OrderedGrowablePipe() {
this(100000);
}
public OrderedGrowablePipe(final int initialCapacity) {
<T> OrderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new LinkedList<Object>();
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new OrderedGrowablePipe();
IPipe pipe = new OrderedGrowablePipe(null, null, 100000);
pipe.connectPorts(sourcePort, targetPort);
}
......
......@@ -55,16 +55,17 @@ public class PipeFactory {
}
public IPipe create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
IPipeFactory pipeFactory = getPipeFactory(tc, ordering, growable);
return pipeFactory.create(capacity);
}
public IPipeFactory getPipeFactory(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
String key = this.buildKey(tc, ordering, growable);
IPipeFactory pipeClass = this.pipeFactories.get(key);
if (null == pipeClass) {
IPipeFactory pipeFactory = this.pipeFactories.get(key);
if (null == pipeFactory) {
throw new CouldNotFindPipeImplException(key);
}
return pipeClass.create(capacity);
}
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable;
return pipeFactory;
}
public void register(final IPipeFactory pipeFactory) {
......@@ -73,4 +74,7 @@ public class PipeFactory {
LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName());
}
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable;
}
}
......@@ -7,8 +7,8 @@ public final class RelayTestPipe<T> extends InterThreadPipe {
private int numInputObjects;
private final ConstructorClosure<T> inputObjectCreator;
public RelayTestPipe(final int numInputObjects,
final ConstructorClosure<T> inputObjectCreator) {
public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
super(null, null);
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
......
......@@ -7,13 +7,13 @@ public final class SingleElementPipe extends IntraThreadPipe {
private Object element;
SingleElementPipe() {
super();
<T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new SingleElementPipe();
IPipe pipe = new SingleElementPipe(null, null);
pipe.connectPorts(sourcePort, targetPort);
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
......@@ -10,7 +12,20 @@ public class SingleElementPipeFactory implements IPipeFactory {
*/
@Override
public IPipe create(final int capacity) {
return new SingleElementPipe();
return create(null, null);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return create(sourcePort, targetPort, 1);
}
/**
* Hint: The capacity for this pipe implementation is ignored
*/
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SingleElementPipe(sourcePort, targetPort);
}
@Override
......
......@@ -16,14 +16,14 @@ public final class SpScPipe extends InterThreadPipe {
// statistics
private int numWaits;
SpScPipe(final int capacity) {
ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(concurrentQueueSpec);
<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.queue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT));
}
@Deprecated
public static <T> SpScPipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe pipe = new SpScPipe(capacity);
SpScPipe pipe = new SpScPipe(sourcePort, targetPort, capacity);
pipe.connectPorts(sourcePort, targetPort);
return pipe;
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
......@@ -7,7 +9,17 @@ public class SpScPipeFactory implements IPipeFactory {
@Override
public IPipe create(final int capacity) {
return new SpScPipe(capacity);
return create(null, null, capacity);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SpScPipe(sourcePort, targetPort, capacity);
}
@Override
......@@ -24,4 +36,5 @@ public class SpScPipeFactory implements IPipeFactory {
public boolean isGrowable() {
return false;
}
}
......@@ -5,20 +5,18 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class UnorderedGrowablePipe extends IntraThreadPipe {
private final int MIN_CAPACITY;
private Object[] elements;
// private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2);
private int lastFreeIndex;
UnorderedGrowablePipe() {
this.MIN_CAPACITY = 4;
this.elements = new Object[this.MIN_CAPACITY];
<T> UnorderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new Object[capacity];
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new UnorderedGrowablePipe();
IPipe pipe = new UnorderedGrowablePipe(null, null, 4);
pipe.connectPorts(sourcePort, targetPort);
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
......@@ -10,7 +12,17 @@ public class UnorderedGrowablePipeFactory implements IPipeFactory {
*/
@Override
public IPipe create(final int capacity) {
return new UnorderedGrowablePipe();
return create(null, null, capacity);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new UnorderedGrowablePipe(sourcePort, targetPort, capacity);
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment