Skip to content
Snippets Groups Projects
Commit 7c003dcd authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'removed-ipipe' into 'master'

remove IPipeFactory and fixed resulting issues

fixes #232

The pipes are also renamed

See merge request !65
parents b521fd35 9ffb56c6
No related branches found
No related tags found
No related merge requests found
Showing
with 41 additions and 234 deletions
......@@ -24,11 +24,10 @@ import org.slf4j.LoggerFactory;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory;
import teetime.framework.pipe.UnsynchedPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.UnboundedSynchedPipe;
/**
* Automatically instantiates the correct pipes
......@@ -37,10 +36,6 @@ class A3PipeInstantiation implements ITraverserVisitor {
private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class);
private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>();
@Override
......@@ -72,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor {
if (targetStageThread != null && sourceStageThread != targetStageThread) {
// inter
if (pipe.capacity() != 0) {
interBoundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
} else {
interUnboundedThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4);
new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
}
} else {
// normal or reflexive pipe => intra
intraThreadPipeFactory.create(pipe.getSourcePort(), pipe.getTargetPort(), 4);
new UnsynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
......
......@@ -31,13 +31,13 @@ import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy;
public abstract class AbstractInterThreadPipe<T> extends AbstractPipe<T> {
public abstract class AbstractSynchedPipe<T> extends AbstractPipe<T> {
private final BlockingQueue<ISignal> signalQueue;
private volatile boolean closed;
protected AbstractInterThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
protected AbstractSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort, capacity);
final Queue<ISignal> localSignalQueue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT));
final PutStrategy<ISignal> putStrategy = new YieldPutStrategy<ISignal>();
......
......@@ -17,11 +17,11 @@ package teetime.framework;
import teetime.framework.signal.ISignal;
public abstract class AbstractIntraThreadPipe<T> extends AbstractPipe<T> {
public abstract class AbstractUnsynchedPipe<T> extends AbstractPipe<T> {
private boolean closed;
protected AbstractIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
protected AbstractUnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort, capacity);
}
......
......@@ -19,18 +19,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.UnboundedSpScPipeFactory;
import teetime.framework.pipe.UnsynchedPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.UnboundedSynchedPipe;
class ExecutionInstantiation {
private static final int DEFAULT_COLOR = 0;
private static final IPipeFactory interBoundedThreadPipeFactory = new SpScPipeFactory();
private static final IPipeFactory interUnboundedThreadPipeFactory = new UnboundedSpScPipeFactory();
private static final IPipeFactory intraThreadPipeFactory = new SingleElementPipeFactory();
private final ConfigurationContext context;
......@@ -87,9 +83,9 @@ class ExecutionInstantiation {
if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.capacity() != 0) {
interBoundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), pipe.capacity());
new BoundedSynchedPipe(outputPort, pipe.getTargetPort(), pipe.capacity());
} else {
interUnboundedThreadPipeFactory.create(outputPort, pipe.getTargetPort(), 4);
new UnboundedSynchedPipe(outputPort, pipe.getTargetPort());
}
numCreatedConnections = 0;
} else {
......@@ -99,7 +95,7 @@ class ExecutionInstantiation {
// (but not its "headstage")
}
}
intraThreadPipeFactory.create(outputPort, pipe.getTargetPort());
new UnsynchedPipe(outputPort, pipe.getTargetPort());
colors.put(targetStage, color);
numCreatedConnections = colorAndConnectStages(targetStage);
}
......
......@@ -15,14 +15,14 @@
*/
package teetime.framework.pipe;
import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe {
public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T>implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
......@@ -30,11 +30,15 @@ class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe
// statistics
private int numWaits;
SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort, capacity);
this.queue = new ObservableSpScArrayQueue<Object>(capacity);
}
public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
this(sourcePort, targetPort, 4);
}
// BETTER introduce a QueueIsFullStrategy
@Override
public boolean add(final Object element) {
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
/**
* Represents the interface, which is must be defined in every PipeFactory
*/
public interface IPipeFactory {
/**
* Connects two stages with a pipe of default capacity.
*
* @param sourcePort
* OutputPort of the stage, which produces data.
* @param targetPort
* Input port of the receiving stage.
* @param <T>
* type of elements which traverse this pipe
*
* @return The connecting pipe.
*/
<T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
/**
* Connects two stages with a pipe.
*
* @param sourcePort
* OutputPort of the stage, which produces data.
* @param targetPort
* Input port of the receiving stage.
* @param capacity
* Number of elements the pipe can carry.
* @param <T>
* type of elements which traverse this pipe
* @return The connecting pipe.
*/
<T> IPipe<T> create(OutputPort<? extends T> sourcePort, InputPort<T> targetPort, int capacity);
/**
* @return Whether or not the created pipes are growable
*/
boolean isGrowable();
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class SingleElementPipeFactory implements IPipeFactory {
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 1);
}
/**
* Hint: The capacity for this pipe implementation is ignored.
* <p>
* {@inheritDoc}
*/
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SingleElementPipe<T>(sourcePort, targetPort);
}
@Override
public boolean isGrowable() {
return false;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class SpScPipeFactory implements IPipeFactory {
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SpScPipe<T>(sourcePort, targetPort, capacity);
}
@Override
public boolean isGrowable() {
return false;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public class UnboundedSpScPipeFactory implements IPipeFactory {
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 0);
}
/**
* {@inheritDoc}
*
* The capacity is ignored.
*/
@Override
public <T> IPipe<T> create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new UnboundedSpScPipe<T>(sourcePort, targetPort);
}
@Override
public boolean isGrowable() {
return true;
}
}
......@@ -22,15 +22,15 @@ import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> {
public final class UnboundedSynchedPipe<T> extends AbstractSynchedPipe<T> {
private final Queue<Object> queue;
UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
public UnboundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort, Integer.MAX_VALUE);
ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(specification);
......
......@@ -15,15 +15,15 @@
*/
package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.AbstractUnsynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> {
public final class UnsynchedPipe<T> extends AbstractUnsynchedPipe<T> {
private Object element;
SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
public UnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort, 1);
}
......
......@@ -21,14 +21,12 @@ import java.util.List;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.signal.StartingSignal;
import teetime.util.framework.port.PortAction;
public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
private final InputPort<T> inputPort;
private final List<PortActionListener<T>> listeners = new ArrayList<PortActionListener<T>>();
......@@ -46,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
}
private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) {
INTER_THREAD_PIPE_FACTORY.create(newOutputPort, inputPort);
new BoundedSynchedPipe<T>(newOutputPort, inputPort);
RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
......
......@@ -17,13 +17,11 @@ package teetime.stage.basic.merger.dynamic;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.util.framework.port.PortAction;
public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> {
private static final SpScPipeFactory INTER_THREAD_PIPE_FACTORY = new SpScPipeFactory();
private final OutputPort<T> outputPort;
public CreatePortAction(final OutputPort<T> outputPort) {
......@@ -38,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> {
}
private void onInputPortCreated(final InputPort<T> newInputPort) {
INTER_THREAD_PIPE_FACTORY.create(outputPort, newInputPort);
new BoundedSynchedPipe<T>(outputPort, newInputPort);
}
}
......@@ -19,7 +19,7 @@ import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.SpScPipeFactory;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
collectorSink.declareActive();
// Can not use createPorts, as the if condition above will lead to an exception
IPipe pipe = new SpScPipeFactory().create(producer.getOutputPort(), collectorSink.getInputPort());
IPipe pipe = new BoundedSynchedPipe(producer.getOutputPort(), collectorSink.getInputPort());
registerCustomPipe((AbstractPipe<?>) pipe);
this.collectorSink = collectorSink;
......
......@@ -23,7 +23,7 @@ import java.util.List;
import org.junit.Test;
import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.signal.ISignal;
......@@ -32,7 +32,7 @@ import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.stage.basic.merger.Merger;
public class SpScPipeTest {
public class BoundedSynchedPipeTest {
// @Ignore
// ignore as long as this test passes null ports to SpScPipe
......@@ -41,7 +41,7 @@ public class SpScPipeTest {
Merger<Object> portSource = new Merger<Object>();
OutputPort<Object> sourcePort = portSource.getOutputPort();
InputPort<Object> targetPort = portSource.getNewInputPort();
AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
AbstractSynchedPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
List<ISignal> signals = new ArrayList<ISignal>();
signals.add(new StartingSignal());
......@@ -71,7 +71,7 @@ public class SpScPipeTest {
@Test(expected = IllegalArgumentException.class)
public void testAdd() throws Exception {
SpScPipe pipe = new SpScPipe(null, null, 4);
BoundedSynchedPipe pipe = new BoundedSynchedPipe(null, null, 4);
assertFalse(pipe.add(null));
}
}
......@@ -19,11 +19,11 @@ import static org.junit.Assert.assertFalse;
import org.junit.Test;
public class SingleElementPipeTest {
public class UnsynchedPipeTest {
@Test(expected = IllegalArgumentException.class)
public void testAdd() throws Exception {
SingleElementPipe pipe = new SingleElementPipe(null, null);
UnsynchedPipe pipe = new UnsynchedPipe(null, null);
assertFalse(pipe.add(null));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment