Skip to content
Snippets Groups Projects
Commit 9675d394 authored by Christian Wulf's avatar Christian Wulf
Browse files

#207 added capacity to IPipe

parent f85a6b12
No related branches found
No related tags found
No related merge requests found
Showing
with 41 additions and 15 deletions
...@@ -38,8 +38,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -38,8 +38,8 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
private volatile boolean closed; private volatile boolean closed;
protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected <T> AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort); super(sourcePort, targetPort, capacity);
final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>(); final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>(); final TakeStrategy<ISignal> takeStrategy = new SCParkTakeStrategy<ISignal>();
......
...@@ -21,8 +21,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { ...@@ -21,8 +21,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
private boolean closed; private boolean closed;
protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected <T> AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort); super(sourcePort, targetPort, capacity);
} }
@Override @Override
......
...@@ -30,8 +30,10 @@ public abstract class AbstractPipe implements IPipe { ...@@ -30,8 +30,10 @@ public abstract class AbstractPipe implements IPipe {
private final OutputPort<?> sourcePort; private final OutputPort<?> sourcePort;
private final InputPort<?> targetPort; private final InputPort<?> targetPort;
@SuppressWarnings("PMD.AvoidFieldNameMatchingMethodName")
private final int capacity;
protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { protected <T> AbstractPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort == null) { if (sourcePort == null) {
throw new IllegalArgumentException("sourcePort may not be null"); throw new IllegalArgumentException("sourcePort may not be null");
} }
...@@ -44,16 +46,17 @@ public abstract class AbstractPipe implements IPipe { ...@@ -44,16 +46,17 @@ public abstract class AbstractPipe implements IPipe {
this.sourcePort = sourcePort; this.sourcePort = sourcePort;
this.targetPort = targetPort; this.targetPort = targetPort;
this.capacity = capacity;
this.cachedTargetStage = targetPort.getOwningStage(); this.cachedTargetStage = targetPort.getOwningStage();
} }
@Override @Override
public OutputPort<?> getSourcePort() { public final OutputPort<?> getSourcePort() {
return sourcePort; return sourcePort;
} }
@Override @Override
public InputPort<?> getTargetPort() { public final InputPort<?> getTargetPort() {
return targetPort; return targetPort;
} }
...@@ -61,4 +64,9 @@ public abstract class AbstractPipe implements IPipe { ...@@ -61,4 +64,9 @@ public abstract class AbstractPipe implements IPipe {
public final boolean hasMore() { public final boolean hasMore() {
return !isEmpty(); return !isEmpty();
} }
@Override
public final int capacity() {
return capacity;
}
} }
...@@ -72,8 +72,8 @@ class ExecutionInstantiation { ...@@ -72,8 +72,8 @@ class ExecutionInstantiation {
} }
if (threadableStages.contains(targetStage) && targetColor != color) { if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.getCapacity() != 0) { if (pipe.capacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.getCapacity()); interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity());
} else { } else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
} }
......
...@@ -95,4 +95,9 @@ public final class DummyPipe implements IPipe { ...@@ -95,4 +95,9 @@ public final class DummyPipe implements IPipe {
} }
@Override
public int capacity() {
return 0;
}
} }
...@@ -23,6 +23,8 @@ public interface IMonitorablePipe { ...@@ -23,6 +23,8 @@ public interface IMonitorablePipe {
int size(); int size();
int capacity();
long getPushThroughput(); long getPushThroughput();
long getPullThroughput(); long getPullThroughput();
......
...@@ -50,10 +50,15 @@ public interface IPipe { ...@@ -50,10 +50,15 @@ public interface IPipe {
boolean isEmpty(); boolean isEmpty();
/** /**
* @return the current number of elements * @return the current number of elements held by this pipe instance
*/ */
int size(); int size();
/**
* @return the maximum number of elements possible to hold by this pipe instance
*/
int capacity();
/** /**
* Retrieves the last element of the pipe and deletes it. * Retrieves the last element of the pipe and deletes it.
* *
......
...@@ -35,7 +35,8 @@ public class InstantiationPipe implements IPipe { ...@@ -35,7 +35,8 @@ public class InstantiationPipe implements IPipe {
targetPort.setPipe(this); targetPort.setPipe(this);
} }
public int getCapacity() { @Override
public int capacity() {
return capacity; return capacity;
} }
......
...@@ -24,7 +24,7 @@ final class RelayTestPipe<T> extends AbstractInterThreadPipe { ...@@ -24,7 +24,7 @@ final class RelayTestPipe<T> extends AbstractInterThreadPipe {
private final ConstructorClosure<T> inputObjectCreator; private final ConstructorClosure<T> inputObjectCreator;
public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) { public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
super(null, null); super(null, null, Integer.MAX_VALUE);
this.numInputObjects = numInputObjects; this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator; this.inputObjectCreator = inputObjectCreator;
} }
......
...@@ -24,7 +24,7 @@ final class SingleElementPipe extends AbstractIntraThreadPipe { ...@@ -24,7 +24,7 @@ final class SingleElementPipe extends AbstractIntraThreadPipe {
private Object element; private Object element;
<T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { <T> SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort); super(sourcePort, targetPort, 1);
} }
@Override @Override
......
...@@ -30,7 +30,7 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe ...@@ -30,7 +30,7 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
private int numWaits; private int numWaits;
<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort); super(sourcePort, targetPort, capacity);
this.queue = new ObservableSpScArrayQueue<Object>(capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity);
} }
......
...@@ -31,7 +31,7 @@ final class UnboundedSpScPipe extends AbstractInterThreadPipe { ...@@ -31,7 +31,7 @@ final class UnboundedSpScPipe extends AbstractInterThreadPipe {
private final Queue<Object> queue; private final Queue<Object> queue;
<T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { <T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort); super(sourcePort, targetPort, Integer.MAX_VALUE);
ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(specification); this.queue = QueueFactory.newQueue(specification);
} }
......
...@@ -69,6 +69,11 @@ class MergerTestingPipe implements IPipe { ...@@ -69,6 +69,11 @@ class MergerTestingPipe implements IPipe {
return 0; return 0;
} }
@Override
public int capacity() {
return 0;
}
@Override @Override
public Object removeLast() { public Object removeLast() {
return null; return null;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment