diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index c11b6f2385ad697a1752b2b57e2bbc0c6f831b06..08afc60dd4b0a9e47014bf73c40e8dd11a71ccf9 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -24,11 +24,10 @@ import org.slf4j.LoggerFactory; import teetime.framework.Traverser.VisitorBehavior; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.UnboundedSpScPipe; /** * Automatically instantiates the correct pipes @@ -37,10 +36,6 @@ class A3PipeInstantiation implements ITraverserVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class); - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); @Override @@ -72,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor { if (targetStageThread != null && sourceStageThread != targetStageThread) { // inter if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + new UnboundedSpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } else { - interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new SpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } } else { // normal or reflexive pipe => intra - intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new SingleElementPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index d9bf4fa485727e922b8d539abc3851ee2538caef..75a70ad512e64eec62c204bcffd1c3e07ac928d2 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -19,18 +19,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.UnboundedSpScPipe; class ExecutionInstantiation { private static final int DEFAULT_COLOR = 0; - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private final ConfigurationContext context; @@ -87,9 +83,9 @@ class ExecutionInstantiation { if (threadableStages.contains(targetStage) && targetColor != color) { if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); + new SpScPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + new UnboundedSpScPipe(outputPort, pipe.getTargetPort()); } numCreatedConnections = 0; } else { @@ -98,7 +94,7 @@ class ExecutionInstantiation { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } - intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + new SingleElementPipe(outputPort, pipe.getTargetPort()); colors.put(targetStage, color); numCreatedConnections = colorAndConnectStages(targetStage); } diff --git a/src/main/java/teetime/framework/pipe/IPipeFactory.java b/src/main/java/teetime/framework/pipe/IPipeFactory.java deleted file mode 100644 index 12eff9f17f8b22ab65493169ee61d5afe9e17eb1..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/IPipeFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -/** - * Represents the interface, which is must be defined in every PipeFactory - */ -public interface IPipeFactory { - - /** - * Connects two stages with a pipe of default capacity. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param <T> - * type of elements which traverse this pipe - * - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); - - /** - * Connects two stages with a pipe. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param capacity - * Number of elements the pipe can carry. - * @param <T> - * type of elements which traverse this pipe - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity); - - /** - * @return Whether or not the created pipes are growable - */ - boolean isGrowable(); - -} diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java index 6a632983d7161e7d442d53c03f366d63747892b4..4ff42560e25cc6a2127ccf80939c3fb47846c5eb 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java @@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { +public final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { private Object element; - SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, 1); } diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java deleted file mode 100644 index 4b81cdb1f9ffef8758fc2f5c74ba839d2037d37a..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SingleElementPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 1); - } - - /** - * Hint: The capacity for this pipe implementation is ignored. - * <p> - * {@inheritDoc} - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SingleElementPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 98fc125b0464407b90c389420bfc0ebef9d09ec0..299e9eaa1f3e4fb798dbb55c70e5fd0cc01d90b4 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -22,7 +22,7 @@ import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -30,11 +30,15 @@ class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe // statistics private int numWaits; - SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } + public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + this(sourcePort, targetPort, 4); + } + // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java deleted file mode 100644 index 1b37ec169f777bc22d0113f32f494e4b00165961..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 4); - } - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SpScPipe<T>(sourcePort, targetPort, capacity); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index a785cd37d95bd6a0c6a62ec9a1e9315702b8d272..623b2e809eca1dfb74eca5be9dfabbff474af156 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { private final Queue<Object> queue; - UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java deleted file mode 100644 index c3ce35ee93560af7287a01a690105768d95ba33f..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public class UnboundedSpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 0); - } - - /** - * {@inheritDoc} - * - * The capacity is ignored. - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new UnboundedSpScPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return true; - } - -} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index fd7e64c6fcbc9f6dde9381e87284d3b7cbef2a47..02cba34fcbe5db68e996a10503754c8be1f7e907 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -21,14 +21,12 @@ import java.util.List; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.RuntimeServiceFacade; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final InputPort<T> inputPort; private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>(); @@ -46,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { } private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { - INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); + new SpScPipe<T>(newOutputPort, inputPort); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java index 5c3878587b11ad2d46f62eba00c156b86f4d0035..bce7550c3447bcf9a70e9becd9b7d2e652763d2d 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -17,13 +17,11 @@ package teetime.stage.basic.merger.dynamic; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final OutputPort<T> outputPort; public CreatePortAction(final OutputPort<T> outputPort) { @@ -38,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { } private void onInputPortCreated(final InputPort<T> newInputPort) { - INTER_THREAD_PIPE_FACTORY.create(outputPort, newInputPort); + new SpScPipe<T>(outputPort, newInputPort); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index dd34362233f949bcbf94c6bf9fc0181bfe323ca0..40bdd787851b2fd27753d9d4492ffea2ef867d2c 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new SpScPipe(producer.getOutputPort(), collectorSink.getInputPort()); registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink;