From 8289481f0270c07a845234f0d9452268e30f46c9 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Fri, 2 Oct 2015 16:09:55 +0200 Subject: [PATCH] fixes #209 renamed pipes --- .../java/teetime/framework/A3PipeInstantiation.java | 12 ++++++------ .../teetime/framework/ExecutionInstantiation.java | 12 ++++++------ .../pipe/{SpScPipe.java => BoundedSynchedPipe.java} | 6 +++--- ...oundedSpScPipe.java => UnboundedSynchedPipe.java} | 4 ++-- .../{SingleElementPipe.java => UnsynchedPipe.java} | 4 ++-- .../basic/distributor/dynamic/CreatePortAction.java | 4 ++-- .../stage/basic/merger/dynamic/CreatePortAction.java | 4 ++-- .../RunnableConsumerStageTestConfiguration.java | 4 ++-- ...SpScPipeTest.java => BoundedSynchedPipeTest.java} | 6 +++--- ...leElementPipeTest.java => UnsynchedPipeTest.java} | 4 ++-- 10 files changed, 30 insertions(+), 30 deletions(-) rename src/main/java/teetime/framework/pipe/{SpScPipe.java => BoundedSynchedPipe.java} (88%) rename src/main/java/teetime/framework/pipe/{UnboundedSpScPipe.java => UnboundedSynchedPipe.java} (90%) rename src/main/java/teetime/framework/pipe/{SingleElementPipe.java => UnsynchedPipe.java} (88%) rename src/test/java/teetime/framework/pipe/{SpScPipeTest.java => BoundedSynchedPipeTest.java} (91%) rename src/test/java/teetime/framework/pipe/{SingleElementPipeTest.java => UnsynchedPipeTest.java} (90%) diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index 08afc60d..ff76f4c6 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -25,9 +25,9 @@ import teetime.framework.Traverser.VisitorBehavior; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; -import teetime.framework.pipe.UnboundedSpScPipe; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; /** * Automatically instantiates the correct pipes @@ -67,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor { if (targetStageThread != null && sourceStageThread != targetStageThread) { // inter if (pipe.capacity() != 0) { - new UnboundedSpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); + new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } else { - new SpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } } else { // normal or reflexive pipe => intra - new SingleElementPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); + new UnsynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index 75a70ad5..8cea2f23 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -20,9 +20,9 @@ import java.util.Map; import java.util.Set; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; -import teetime.framework.pipe.UnboundedSpScPipe; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; class ExecutionInstantiation { @@ -83,9 +83,9 @@ class ExecutionInstantiation { if (threadableStages.contains(targetStage) && targetColor != color) { if (pipe.capacity() != 0) { - new SpScPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); + new BoundedSynchedPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { - new UnboundedSpScPipe(outputPort, pipe.getTargetPort()); + new UnboundedSynchedPipe(outputPort, pipe.getTargetPort()); } numCreatedConnections = 0; } else { @@ -94,7 +94,7 @@ class ExecutionInstantiation { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } - new SingleElementPipe(outputPort, pipe.getTargetPort()); + new UnsynchedPipe(outputPort, pipe.getTargetPort()); colors.put(targetStage, color); numCreatedConnections = colorAndConnectStages(targetStage); } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/pipe/SpScPipe.java rename to src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java index 299e9eaa..9af9a8c7 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java @@ -22,7 +22,7 @@ import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class BoundedSynchedPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -30,12 +30,12 @@ public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorab // statistics private int numWaits; - public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } - public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { this(sourcePort, targetPort, 4); } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java similarity index 90% rename from src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java rename to src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java index 623b2e80..08ebc567 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java @@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSynchedPipe<T> extends AbstractInterThreadPipe<T> { private final Queue<Object> queue; - public UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnboundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/pipe/SingleElementPipe.java rename to src/main/java/teetime/framework/pipe/UnsynchedPipe.java index 4ff42560..7fdc5908 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java @@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { +public final class UnsynchedPipe<T> extends AbstractIntraThreadPipe<T> { private Object element; - public SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, 1); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index 02cba34f..5fa26368 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.RuntimeServiceFacade; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; @@ -44,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { } private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { - new SpScPipe<T>(newOutputPort, inputPort); + new BoundedSynchedPipe<T>(newOutputPort, inputPort); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java index bce7550c..3a38b286 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -17,7 +17,7 @@ package teetime.stage.basic.merger.dynamic; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { @@ -36,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { } private void onInputPortCreated(final InputPort<T> newInputPort) { - new SpScPipe<T>(outputPort, newInputPort); + new BoundedSynchedPipe<T>(outputPort, newInputPort); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 40bdd787..09a90d5e 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - IPipe pipe = new SpScPipe(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new BoundedSynchedPipe(producer.getOutputPort(), collectorSink.getInputPort()); registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java similarity index 91% rename from src/test/java/teetime/framework/pipe/SpScPipeTest.java rename to src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java index 1953bae4..fcbb3ada 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java @@ -32,7 +32,7 @@ import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.stage.basic.merger.Merger; -public class SpScPipeTest { +public class BoundedSynchedPipeTest { // @Ignore // ignore as long as this test passes null ports to SpScPipe @@ -41,7 +41,7 @@ public class SpScPipeTest { Merger<Object> portSource = new Merger<Object>(); OutputPort<Object> sourcePort = portSource.getOutputPort(); InputPort<Object> targetPort = portSource.getNewInputPort(); - AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method + AbstractInterThreadPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); @@ -71,7 +71,7 @@ public class SpScPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SpScPipe pipe = new SpScPipe(null, null, 4); + BoundedSynchedPipe pipe = new BoundedSynchedPipe(null, null, 4); assertFalse(pipe.add(null)); } } diff --git a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java similarity index 90% rename from src/test/java/teetime/framework/pipe/SingleElementPipeTest.java rename to src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java index 03978f0e..71846916 100644 --- a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java +++ b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java @@ -19,11 +19,11 @@ import static org.junit.Assert.assertFalse; import org.junit.Test; -public class SingleElementPipeTest { +public class UnsynchedPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SingleElementPipe pipe = new SingleElementPipe(null, null); + UnsynchedPipe pipe = new UnsynchedPipe(null, null); assertFalse(pipe.add(null)); } -- GitLab