From c602162daf9c8565b757f7de381226b6bac125fd Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Thu, 1 Oct 2015 13:41:31 +0200 Subject: [PATCH] remove IPipe and fixed resulting issues --- .../framework/A3PipeInstantiation.java | 17 ++---- .../framework/ExecutionInstantiation.java | 16 ++--- .../teetime/framework/pipe/IPipeFactory.java | 60 ------------------- .../framework/pipe/SingleElementPipe.java | 4 +- .../pipe/SingleElementPipeFactory.java | 43 ------------- .../java/teetime/framework/pipe/SpScPipe.java | 8 ++- .../framework/pipe/SpScPipeFactory.java | 38 ------------ .../framework/pipe/UnboundedSpScPipe.java | 4 +- .../pipe/UnboundedSpScPipeFactory.java | 43 ------------- .../distributor/dynamic/CreatePortAction.java | 6 +- .../merger/dynamic/CreatePortAction.java | 6 +- ...unnableConsumerStageTestConfiguration.java | 4 +- 12 files changed, 28 insertions(+), 221 deletions(-) delete mode 100644 src/main/java/teetime/framework/pipe/IPipeFactory.java delete mode 100644 src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java delete mode 100644 src/main/java/teetime/framework/pipe/SpScPipeFactory.java delete mode 100644 src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java diff --git a/src/main/java/teetime/framework/A3PipeInstantiation.java b/src/main/java/teetime/framework/A3PipeInstantiation.java index c11b6f23..08afc60d 100644 --- a/src/main/java/teetime/framework/A3PipeInstantiation.java +++ b/src/main/java/teetime/framework/A3PipeInstantiation.java @@ -24,11 +24,10 @@ import org.slf4j.LoggerFactory; import teetime.framework.Traverser.VisitorBehavior; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.UnboundedSpScPipe; /** * Automatically instantiates the correct pipes @@ -37,10 +36,6 @@ class A3PipeInstantiation implements ITraverserVisitor { private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class); - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); - private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); @Override @@ -72,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor { if (targetStageThread != null && sourceStageThread != targetStageThread) { // inter if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); + new UnboundedSpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } else { - interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new SpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } } } else { // normal or reflexive pipe => intra - intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4); + new SingleElementPipe<T>(pipe.getSourcePort(), pipe.getTargetPort()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort()); } diff --git a/src/main/java/teetime/framework/ExecutionInstantiation.java b/src/main/java/teetime/framework/ExecutionInstantiation.java index d9bf4fa4..75a70ad5 100644 --- a/src/main/java/teetime/framework/ExecutionInstantiation.java +++ b/src/main/java/teetime/framework/ExecutionInstantiation.java @@ -19,18 +19,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.InstantiationPipe; -import teetime.framework.pipe.SingleElementPipeFactory; -import teetime.framework.pipe.SpScPipeFactory; -import teetime.framework.pipe.UnboundedSpScPipeFactory; +import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.SpScPipe; +import teetime.framework.pipe.UnboundedSpScPipe; class ExecutionInstantiation { private static final int DEFAULT_COLOR = 0; - private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory(); - private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory(); - private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory(); private final ConfigurationContext context; @@ -87,9 +83,9 @@ class ExecutionInstantiation { if (threadableStages.contains(targetStage) && targetColor != color) { if (pipe.capacity() != 0) { - interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity()); + new SpScPipe(outputPort, pipe.getTargetPort(), pipe.capacity()); } else { - interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4); + new UnboundedSpScPipe(outputPort, pipe.getTargetPort()); } numCreatedConnections = 0; } else { @@ -98,7 +94,7 @@ class ExecutionInstantiation { throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage") } } - intraThreadPipeFactory.create(outputPort, pipe.getTargetPort()); + new SingleElementPipe(outputPort, pipe.getTargetPort()); colors.put(targetStage, color); numCreatedConnections = colorAndConnectStages(targetStage); } diff --git a/src/main/java/teetime/framework/pipe/IPipeFactory.java b/src/main/java/teetime/framework/pipe/IPipeFactory.java deleted file mode 100644 index 12eff9f1..00000000 --- a/src/main/java/teetime/framework/pipe/IPipeFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -/** - * Represents the interface, which is must be defined in every PipeFactory - */ -public interface IPipeFactory { - - /** - * Connects two stages with a pipe of default capacity. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param <T> - * type of elements which traverse this pipe - * - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); - - /** - * Connects two stages with a pipe. - * - * @param sourcePort - * OutputPort of the stage, which produces data. - * @param targetPort - * Input port of the receiving stage. - * @param capacity - * Number of elements the pipe can carry. - * @param <T> - * type of elements which traverse this pipe - * @return The connecting pipe. - */ - <T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity); - - /** - * @return Whether or not the created pipes are growable - */ - boolean isGrowable(); - -} diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipe.java b/src/main/java/teetime/framework/pipe/SingleElementPipe.java index 6a632983..4ff42560 100644 --- a/src/main/java/teetime/framework/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/framework/pipe/SingleElementPipe.java @@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { +public final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> { private Object element; - SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, 1); } diff --git a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java deleted file mode 100644 index 4b81cdb1..00000000 --- a/src/main/java/teetime/framework/pipe/SingleElementPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SingleElementPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 1); - } - - /** - * Hint: The capacity for this pipe implementation is ignored. - * <p> - * {@inheritDoc} - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SingleElementPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 98fc125b..299e9eaa 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -22,7 +22,7 @@ import teetime.framework.StageState; import teetime.framework.exceptionHandling.TerminateException; import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue; -class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { +public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -30,11 +30,15 @@ class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe // statistics private int numWaits; - SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort, capacity); this.queue = new ObservableSpScArrayQueue<Object>(capacity); } + public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + this(sourcePort, targetPort, 4); + } + // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java b/src/main/java/teetime/framework/pipe/SpScPipeFactory.java deleted file mode 100644 index 1b37ec16..00000000 --- a/src/main/java/teetime/framework/pipe/SpScPipeFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public final class SpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 4); - } - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new SpScPipe<T>(sourcePort, targetPort, capacity); - } - - @Override - public boolean isGrowable() { - return false; - } - -} diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index a785cd37..623b2e80 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { +public final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> { private final Queue<Object> queue; - UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + public UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { super(sourcePort, targetPort, Integer.MAX_VALUE); ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(specification); diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java deleted file mode 100644 index c3ce35ee..00000000 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipeFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.framework.pipe; - -import teetime.framework.InputPort; -import teetime.framework.OutputPort; - -public class UnboundedSpScPipeFactory implements IPipeFactory { - - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - return this.create(sourcePort, targetPort, 0); - } - - /** - * {@inheritDoc} - * - * The capacity is ignored. - */ - @Override - public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - return new UnboundedSpScPipe<T>(sourcePort, targetPort); - } - - @Override - public boolean isGrowable() { - return true; - } - -} diff --git a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java index fd7e64c6..02cba34f 100644 --- a/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/distributor/dynamic/CreatePortAction.java @@ -21,14 +21,12 @@ import java.util.List; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.RuntimeServiceFacade; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.framework.signal.StartingSignal; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final InputPort<T> inputPort; private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>(); @@ -46,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> { } private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) { - INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort); + new SpScPipe<T>(newOutputPort, inputPort); RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage()); diff --git a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java index 5c387858..bce7550c 100644 --- a/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java +++ b/src/main/java/teetime/stage/basic/merger/dynamic/CreatePortAction.java @@ -17,13 +17,11 @@ package teetime.stage.basic.merger.dynamic; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.util.framework.port.PortAction; public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { - private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory(); - private final OutputPort<T> outputPort; public CreatePortAction(final OutputPort<T> outputPort) { @@ -38,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> { } private void onInputPortCreated(final InputPort<T> newInputPort) { - INTER_THREAD_PIPE_FACTORY.create(outputPort, newInputPort); + new SpScPipe<T>(outputPort, newInputPort); } } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index dd343622..40bdd787 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.List; import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.SpScPipeFactory; +import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; @@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration { collectorSink.declareActive(); // Can not use createPorts, as the if condition above will lead to an exception - IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort()); + IPipe pipe = new SpScPipe(producer.getOutputPort(), collectorSink.getInputPort()); registerCustomPipe((AbstractPipe<?>) pipe); this.collectorSink = collectorSink; -- GitLab