diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractSynchedPipe.java similarity index 93% rename from src/main/java/teetime/framework/AbstractInterThreadPipe.java rename to src/main/java/teetime/framework/AbstractSynchedPipe.java index f723c2fe989a64aaee2be80dafb7e801730328cc..57d625d51500e8c34db37b64c04dcc8f8225326c 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractSynchedPipe.java @@ -31,13 +31,13 @@ import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy; import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy; -public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { +public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> { private final BlockingQueue<ISignal> signalQueue; private volatile boolean closed; - protected AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected AbstractSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractUnsynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/AbstractIntraThreadPipe.java rename to src/main/java/teetime/framework/AbstractUnsynchedPipe.java index 29d5c8fd532107c8b047a55f01f4be16b9892a8c..6f5642a8899bb46abd799e61e29a74e607cebea9 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractUnsynchedPipe.java @@ -17,11 +17,11 @@ package teetime.framework; import teetime.framework.signal.ISignal; -public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { +public abstract class AbstractUnsynchedPipe<T> extends AbstractPipe<T> { private boolean closed; - protected AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected AbstractUnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java index 9af9a8c77673d7c96a250570117cd39b758a8896..3feed1a096afb54f1d825c65aacfef13efb3b40f 100644 --- a/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java +++ b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java @@ -15,14 +15,14 @@ */ package teetime.framework.pipe; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -public class BoundedSynchedPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); diff --git a/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java index 08ebc56757a059ba6d89597e97a34aa7f0388eea..487886fba0844c0d949acc11f4fb8c98dd466e0f 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java @@ -22,11 +22,11 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class UnboundedSynchedPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSynchedPipe<T> extends AbstractSynchedPipe<T> { private final Queue<Object> queue; diff --git a/src/main/java/teetime/framework/pipe/UnsynchedPipe.java b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java index 7fdc590816b2c1158b4e3ea05be4627c9355ea7c..f5176e4d76cbdaf657f6cff7171364b059b4e8c3 100644 --- a/src/main/java/teetime/framework/pipe/UnsynchedPipe.java +++ b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java @@ -15,11 +15,11 @@ */ package teetime.framework.pipe; -import teetime.framework.AbstractIntraThreadPipe; +import teetime.framework.AbstractUnsynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class UnsynchedPipe<T> extends AbstractIntraThreadPipe<T> { +public final class UnsynchedPipe<T> extends AbstractUnsynchedPipe<T> { private Object element; diff --git a/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java index fcbb3adaeb41cf8436fa2e1fb29158d89540e728..bb4d2625906225966cde479a07a0bfaa3cd67e95 100644 --- a/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java +++ b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java @@ -23,7 +23,7 @@ import java.util.List; import org.junit.Test; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; @@ -41,7 +41,7 @@ public class BoundedSynchedPipeTest { Merger<Object> portSource = new Merger<Object>(); OutputPort<Object> sourcePort = portSource.getOutputPort(); InputPort<Object> targetPort = portSource.getNewInputPort(); - AbstractInterThreadPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method + AbstractSynchedPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal());