diff --git a/src/main/java/teetime/framework/pipe/WorkStealingPipe.java b/src/main/java/teetime/framework/pipe/WorkStealingPipe.java index 263c93bbab4f040f91259e2df3325da2099908d1..5a2f6c629414e026fc4f4f00818b3f3c5198969a 100644 --- a/src/main/java/teetime/framework/pipe/WorkStealingPipe.java +++ b/src/main/java/teetime/framework/pipe/WorkStealingPipe.java @@ -60,7 +60,7 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements Object obj = this.queue.poll(); if (obj != null) { return obj; - } else { + } else if (haulSize > 0) { int index; WorkStealingPipe<T> victim; if (busyCheck) { @@ -96,6 +96,9 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements } } return this.queue.poll(); + } else { + // haulSize is 0 and we're not permitted to steal: + return null; } } diff --git a/src/main/java/teetime/framework/pipe/WorkStealingPipeFactory.java b/src/main/java/teetime/framework/pipe/WorkStealingPipeFactory.java index 39b8014b316b5d6168163ca379cdabc03e723fb7..c69f23c9a2f914d42590ce74ccdc504c841af236 100644 --- a/src/main/java/teetime/framework/pipe/WorkStealingPipeFactory.java +++ b/src/main/java/teetime/framework/pipe/WorkStealingPipeFactory.java @@ -82,4 +82,12 @@ public class WorkStealingPipeFactory<T> { return create(source, target, 0, 1, true); } + /** + * Create a pipe utilizing no work stealing. This pipe is only able to become a victim and no theft will be attempted. + * Mimicks a {@link BoundedSynchedPipe} in behaviour. + */ + public WorkStealingPipe<T> createNWSPipe(final OutputPort<? extends T> source, final InputPort<T> target) { + return create(source, target, 0, 0, false); + } + }