diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index c11b6f2385ad697a1752b2b57e2bbc0c6f831b06..ff76f4c66baf64876321512b3a2579b42f004e90 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -24,11 +24,10 @@ import org.slf4j.LoggerFactory; import teetime.framework.Traverser.VisitorBehavior; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; /** * Automatically instantiates the correct pipes @@ -37,10 +36,6 @@ class A3PipeInstantiation implements ITraverserVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class); - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); @Override @@ -72,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor { if (targetStageThread != null && sourceStageThread != targetStageThread) { // inter if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } else { - interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } } else { // normal or reflexive pipe => intra - intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new UnsynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractSynchedPipe.java similarity index 93% rename from src/main/java/teetime/framework/AbstractInterThreadPipe.java rename to src/main/java/teetime/framework/AbstractSynchedPipe.java index 0592066dce80ec2e1651cbf10d3f15a5f065083b..ce6f322a1ecbb2e269afe4308765166149b90044 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractSynchedPipe.java @@ -31,13 +31,13 @@ import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy; import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy; -public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> { +public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> { private final BlockingQueue<ISignal> signalQueue; private volatile boolean closed; - protected AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected AbstractSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT)); final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>(); diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractUnsynchedPipe.java similarity index 88% rename from src/main/java/teetime/framework/AbstractIntraThreadPipe.java rename to src/main/java/teetime/framework/AbstractUnsynchedPipe.java index 29d5c8fd532107c8b047a55f01f4be16b9892a8c..6f5642a8899bb46abd799e61e29a74e607cebea9 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractUnsynchedPipe.java @@ -17,11 +17,11 @@ package teetime.framework; import teetime.framework.signal.ISignal; -public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> { +public abstract class AbstractUnsynchedPipe<T> extends AbstractPipe<T> { private boolean closed; - protected AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + protected AbstractUnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index e30c1ee0daa01776269e6554ac6401c0e8e174b3..90a1473cadb0d91a90b0c7fb049f2e906b28c88c 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -19,18 +19,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.UnsynchedPipe; +import teetime.framework.pipe.BoundedSynchedPipe; +import teetime.framework.pipe.UnboundedSynchedPipe; class ExecutionInstantiation { private static final int DEFAULT_COLOR = 0; - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private final ConfigurationContext context; @@ -87,9 +83,9 @@ class ExecutionInstantiation { if (threadableStages.contains(targetStage) && targetColor != color) { if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); + new BoundedSynchedPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + new UnboundedSynchedPipe(outputPort, pipe.getTargetPort()); } numCreatedConnections = 0; } else { @@ -99,7 +95,7 @@ class ExecutionInstantiation { // (but not its "headstage") } } - intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + new UnsynchedPipe(outputPort, pipe.getTargetPort()); colors.put(targetStage, color); numCreatedConnections = colorAndConnectStages(targetStage); } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java similarity index 85% rename from src/main/java/teetime/framework/pipe/SpScPipe.java rename to src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java index 98fc125b0464407b90c389420bfc0ebef9d09ec0..3feed1a096afb54f1d825c65aacfef13efb3b40f 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/BoundedSynchedPipe.java @@ -15,14 +15,14 @@ */ package teetime.framework.pipe; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -30,11 +30,15 @@ class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe // statistics private int numWaits; - SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } + public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + this(sourcePort, targetPort, 4); + } + // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { diff --git a/src/main/java/teetime/framework/pipe/IPipeFactory.java b/src/main/java/teetime/framework/pipe/IPipeFactory.java deleted file mode 100644 index 12eff9f17f8b22ab65493169ee61d5afe9e17eb1..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/IPipeFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -/** - * Represents the interface, which is must be defined in every PipeFactory - */ -public interface IPipeFactory { - - /** - * Connects two stages with a pipe of default capacity. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param <T> - * type of elements which traverse this pipe - * - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); - - /** - * Connects two stages with a pipe. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param capacity - * Number of elements the pipe can carry. - * @param <T> - * type of elements which traverse this pipe - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity); - - /** - * @return Whether or not the created pipes are growable - */ - boolean isGrowable(); - -} diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java deleted file mode 100644 index 4b81cdb1f9ffef8758fc2f5c74ba839d2037d37a..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SingleElementPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 1); - } - - /** - * Hint: The capacity for this pipe implementation is ignored. - * <p> - * {@inheritDoc} - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SingleElementPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java deleted file mode 100644 index 1b37ec169f777bc22d0113f32f494e4b00165961..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 4); - } - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SpScPipe<T>(sourcePort, targetPort, capacity); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java deleted file mode 100644 index c3ce35ee93560af7287a01a690105768d95ba33f..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public class UnboundedSpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 0); - } - - /** - * {@inheritDoc} - * - * The capacity is ignored. - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new UnboundedSpScPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return true; - } - -} diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java similarity index 87% rename from src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java rename to src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java index a785cd37d95bd6a0c6a62ec9a1e9315702b8d272..487886fba0844c0d949acc11f4fb8c98dd466e0f 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSynchedPipe.java @@ -22,15 +22,15 @@ import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Preference; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSynchedPipe<T> extends AbstractSynchedPipe<T> { private final Queue<Object> queue; - UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnboundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java similarity index 86% rename from src/main/java/teetime/framework/pipe/SingleElementPipe.java rename to src/main/java/teetime/framework/pipe/UnsynchedPipe.java index 6a632983d7161e7d442d53c03f366d63747892b4..f5176e4d76cbdaf657f6cff7171364b059b4e8c3 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/UnsynchedPipe.java @@ -15,15 +15,15 @@ */ package teetime.framework.pipe; -import teetime.framework.AbstractIntraThreadPipe; +import teetime.framework.AbstractUnsynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { +public final class UnsynchedPipe<T> extends AbstractUnsynchedPipe<T> { private Object element; - SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, 1); } diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index fd7e64c6fcbc9f6dde9381e87284d3b7cbef2a47..5fa263683717f12f9edb99ddb90766712067fb34 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -21,14 +21,12 @@ import java.util.List; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.RuntimeServiceFacade; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final InputPort<T> inputPort; private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>(); @@ -46,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { } private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { - INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); + new BoundedSynchedPipe<T>(newOutputPort, inputPort); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java index 5c3878587b11ad2d46f62eba00c156b86f4d0035..3a38b286e97a34c92b4038726862c9fe885c95a1 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -17,13 +17,11 @@ package teetime.stage.basic.merger.dynamic; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final OutputPort<T> outputPort; public CreatePortAction(final OutputPort<T> outputPort) { @@ -38,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { } private void onInputPortCreated(final InputPort<T> newInputPort) { - INTER_THREAD_PIPE_FACTORY.create(outputPort, newInputPort); + new BoundedSynchedPipe<T>(outputPort, newInputPort); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index dd34362233f949bcbf94c6bf9fc0181bfe323ca0..09a90d5eb32298d746d4af7d3de1773d6158ec86 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.BoundedSynchedPipe; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new BoundedSynchedPipe(producer.getOutputPort(), collectorSink.getInputPort()); registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java similarity index 89% rename from src/test/java/teetime/framework/pipe/SpScPipeTest.java rename to src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java index 1953bae41dadd051861df03b6a7787775d716bad..bb4d2625906225966cde479a07a0bfaa3cd67e95 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/BoundedSynchedPipeTest.java @@ -23,7 +23,7 @@ import java.util.List; import org.junit.Test; -import teetime.framework.AbstractInterThreadPipe; +import teetime.framework.AbstractSynchedPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; @@ -32,7 +32,7 @@ import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.stage.basic.merger.Merger; -public class SpScPipeTest { +public class BoundedSynchedPipeTest { // @Ignore // ignore as long as this test passes null ports to SpScPipe @@ -41,7 +41,7 @@ public class SpScPipeTest { Merger<Object> portSource = new Merger<Object>(); OutputPort<Object> sourcePort = portSource.getOutputPort(); InputPort<Object> targetPort = portSource.getNewInputPort(); - AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method + AbstractSynchedPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method List<ISignal> signals = new ArrayList<ISignal>(); signals.add(new StartingSignal()); @@ -71,7 +71,7 @@ public class SpScPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SpScPipe pipe = new SpScPipe(null, null, 4); + BoundedSynchedPipe pipe = new BoundedSynchedPipe(null, null, 4); assertFalse(pipe.add(null)); } } diff --git a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java similarity index 90% rename from src/test/java/teetime/framework/pipe/SingleElementPipeTest.java rename to src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java index 03978f0ea54d2b1a5c1b2dddcaac3944ce8b0502..71846916f181002cdc8fc231d40283d59d86bf2f 100644 --- a/src/test/java/teetime/framework/pipe/SingleElementPipeTest.java +++ b/src/test/java/teetime/framework/pipe/UnsynchedPipeTest.java @@ -19,11 +19,11 @@ import static org.junit.Assert.assertFalse; import org.junit.Test; -public class SingleElementPipeTest { +public class UnsynchedPipeTest { @Test(expected = IllegalArgumentException.class) public void testAdd() throws Exception { - SingleElementPipe pipe = new SingleElementPipe(null, null); + UnsynchedPipe pipe = new UnsynchedPipe(null, null); assertFalse(pipe.add(null)); }