diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index 08afc60dd4b0a9e47014bf73c40e8dd11a71ccf9..ff76f4c66baf64876321512b3a2579b42f004e90 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -25,9 +25,9 @@ import teetime.framework.Traverser.VisitorBehavior; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; -import teetime.framework.pipe.UnboundedSpScPipe; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; /** * Automatically instantiates the correct pipes @@ -67,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor { if (targetStageThread != null && sourceStageThread != targetStageThread) { // inter if (pipe.capacity() != 0) { - new UnboundedSpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); + new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } else { - new SpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } } else { // normal or reflexive pipe => intra - new SingleElementPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); + new UnsynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 75a70ad512e64eec62c204bcffd1c3e07ac928d2..8cea2f2313aaafcfe7f81b3bf9e24c9cc0d5ef75 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -20,9 +20,9 @@ import java.util.Map; import java.util.Set; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; -import teetime.framework.pipe.UnboundedSpScPipe; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; class ExecutionInstantiation { @@ -83,9 +83,9 @@ class ExecutionInstantiation { if (threadableStages.contains(targetStage) && targetColor != color) { if (pipe.capacity() != 0) { - new SpScPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); + new BoundedSynchedPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { - new UnboundedSpScPipe(outputPort, pipe.getTargetPort()); + new UnboundedSynchedPipe(outputPort, pipe.getTargetPort()); } numCreatedConnections = 0; } else { @@ -94,7 +94,7 @@ class ExecutionInstantiation { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } - new SingleElementPipe(outputPort, pipe.getTargetPort()); + new UnsynchedPipe(outputPort, pipe.getTargetPort()); colors.put(targetStage, color); numCreatedConnections = colorAndConnectStages(targetStage); } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/pipe/SpScPipe.java rename to src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java index 299e9eaa1f3e4fb798dbb55c70e5fd0cc01d90b4..9af9a8c77673d7c96a250570117cd39b758a8896 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java @@ -22,7 +22,7 @@ import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class BoundedSynchedPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -30,12 +30,12 @@ public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorab // statistics private int numWaits; - public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } - public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { this(sourcePort, targetPort, 4); } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java similarity index 90% rename from src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java rename to src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java index 623b2e809eca1dfb74eca5be9dfabbff474af156..08ebc56757a059ba6d89597e97a34aa7f0388eea 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java @@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSynchedPipe<T> extends AbstractInterThreadPipe<T> { private final Queue<Object> queue; - public UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnboundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/pipe/SingleElementPipe.java rename to src/main/java/teetime/framework/pipe/UnsynchedPipe.java index 4ff42560e25cc6a2127ccf80939c3fb47846c5eb..7fdc590816b2c1158b4e3ea05be4627c9355ea7c 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java @@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { +public final class UnsynchedPipe<T> extends AbstractIntraThreadPipe<T> { private Object element; - public SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, 1); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 02cba34fcbe5db68e996a10503754c8be1f7e907..5fa263683717f12f9edb99ddb90766712067fb34 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.RuntimeServiceFacade; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; @@ -44,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { } private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { - new SpScPipe<T>(newOutputPort, inputPort); + new BoundedSynchedPipe<T>(newOutputPort, inputPort); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java index bce7550c3447bcf9a70e9becd9b7d2e652763d2d..3a38b286e97a34c92b4038726862c9fe885c95a1 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -17,7 +17,7 @@ package teetime.stage.basic.merger.dynamic; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { @@ -36,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { } private void onInputPortCreated(final InputPort<T> newInputPort) { - new SpScPipe<T>(outputPort, newInputPort); + new BoundedSynchedPipe<T>(outputPort, newInputPort); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 40bdd787851b2fd27753d9d4492ffea2ef867d2c..09a90d5eb32298d746d4af7d3de1773d6158ec86 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - IPipe pipe = new SpScPipe(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new BoundedSynchedPipe(producer.getOutputPort(), collectorSink.getInputPort()); registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java similarity index 91% rename from src/test/java/teetime/framework/pipe/SpScPipeTest.java rename to src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java index 1953bae41dadd051861df03b6a7787775d716bad..fcbb3adaeb41cf8436fa2e1fb29158d89540e728 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java @@ -32,7 +32,7 @@ import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.stage.basic.merger.Merger; -public class SpScPipeTest { +public class BoundedSynchedPipeTest { // @Ignore // ignore as long as this test passes null ports to SpScPipe @@ -41,7 +41,7 @@ public class SpScPipeTest { Merger<Object> portSource = new Merger<Object>(); OutputPort<Object> sourcePort = portSource.getOutputPort(); InputPort<Object> targetPort = portSource.getNewInputPort(); - AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method + AbstractInterThreadPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); @@ -71,7 +71,7 @@ public class SpScPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SpScPipe pipe = new SpScPipe(null, null, 4); + BoundedSynchedPipe pipe = new BoundedSynchedPipe(null, null, 4); assertFalse(pipe.add(null)); } } diff --git a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java similarity index 90% rename from src/test/java/teetime/framework/pipe/SingleElementPipeTest.java rename to src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java index 03978f0ea54d2b1a5c1b2dddcaac3944ce8b0502..71846916f181002cdc8fc231d40283d59d86bf2f 100644 --- a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java +++ b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java @@ -19,11 +19,11 @@ import static org.junit.Assert.assertFalse; import org.junit.Test; -public class SingleElementPipeTest { +public class UnsynchedPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SingleElementPipe pipe = new SingleElementPipe(null, null); + UnsynchedPipe pipe = new UnsynchedPipe(null, null); assertFalse(pipe.add(null)); }