Skip to content
Snippets Groups Projects
Commit 8289481f authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

fixes #209 renamed pipes

parent c602162d
No related branches found
No related tags found
No related merge requests found
Showing
with 30 additions and 30 deletions
......@@ -25,9 +25,9 @@ import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.UnboundedSpScPipe;
import teetime.framework.pipe.UnsynchedPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.UnboundedSynchedPipe;
/**
* Automatically instantiates the correct pipes
......@@ -67,19 +67,19 @@ class A3PipeInstantiation implements ITraverserVisitor {
if (targetStageThread != null && sourceStageThread != targetStageThread) {
// inter
if (pipe.capacity() != 0) {
new UnboundedSpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
} else {
new SpScPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
}
} else {
// normal or reflexive pipe => intra
new SingleElementPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
new UnsynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (unsynch) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
......
......@@ -20,9 +20,9 @@ import java.util.Map;
import java.util.Set;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.UnboundedSpScPipe;
import teetime.framework.pipe.UnsynchedPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.UnboundedSynchedPipe;
class ExecutionInstantiation {
......@@ -83,9 +83,9 @@ class ExecutionInstantiation {
if (threadableStages.contains(targetStage) && targetColor != color) {
if (pipe.capacity() != 0) {
new SpScPipe(outputPort, pipe.getTargetPort(), pipe.capacity());
new BoundedSynchedPipe(outputPort, pipe.getTargetPort(), pipe.capacity());
} else {
new UnboundedSpScPipe(outputPort, pipe.getTargetPort());
new UnboundedSynchedPipe(outputPort, pipe.getTargetPort());
}
numCreatedConnections = 0;
} else {
......@@ -94,7 +94,7 @@ class ExecutionInstantiation {
throw new IllegalStateException("Crossing threads"); // One stage is connected to a stage of another thread (but not its "headstage")
}
}
new SingleElementPipe(outputPort, pipe.getTargetPort());
new UnsynchedPipe(outputPort, pipe.getTargetPort());
colors.put(targetStage, color);
numCreatedConnections = colorAndConnectStages(targetStage);
}
......
......@@ -22,7 +22,7 @@ import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe {
public class BoundedSynchedPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
......@@ -30,12 +30,12 @@ public class SpScPipe<T> extends AbstractInterThreadPipe<T>implements IMonitorab
// statistics
private int numWaits;
public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort, capacity);
this.queue = new ObservableSpScArrayQueue<Object>(capacity);
}
public SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
public BoundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
this(sourcePort, targetPort, 4);
}
......
......@@ -26,11 +26,11 @@ import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class UnboundedSpScPipe<T> extends AbstractInterThreadPipe<T> {
public final class UnboundedSynchedPipe<T> extends AbstractInterThreadPipe<T> {
private final Queue<Object> queue;
public UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
public UnboundedSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort, Integer.MAX_VALUE);
ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(specification);
......
......@@ -19,11 +19,11 @@ import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class SingleElementPipe<T> extends AbstractIntraThreadPipe<T> {
public final class UnsynchedPipe<T> extends AbstractIntraThreadPipe<T> {
private Object element;
public SingleElementPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
public UnsynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort, 1);
}
......
......@@ -21,7 +21,7 @@ import java.util.List;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.RuntimeServiceFacade;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.signal.StartingSignal;
import teetime.util.framework.port.PortAction;
......@@ -44,7 +44,7 @@ public class CreatePortAction<T> implements PortAction<DynamicDistributor<T>> {
}
private void processOutputPort(final DynamicDistributor<T> dynamicDistributor, final OutputPort<T> newOutputPort) {
new SpScPipe<T>(newOutputPort, inputPort);
new BoundedSynchedPipe<T>(newOutputPort, inputPort);
RuntimeServiceFacade.INSTANCE.startWithinNewThread(dynamicDistributor, inputPort.getOwningStage());
......
......@@ -17,7 +17,7 @@ package teetime.stage.basic.merger.dynamic;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.util.framework.port.PortAction;
public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> {
......@@ -36,6 +36,6 @@ public class CreatePortAction<T> implements PortAction<DynamicMerger<T>> {
}
private void onInputPortCreated(final InputPort<T> newInputPort) {
new SpScPipe<T>(outputPort, newInputPort);
new BoundedSynchedPipe<T>(outputPort, newInputPort);
}
}
......@@ -19,7 +19,7 @@ import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.SpScPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
......@@ -38,7 +38,7 @@ public class RunnableConsumerStageTestConfiguration extends Configuration {
collectorSink.declareActive();
// Can not use createPorts, as the if condition above will lead to an exception
IPipe pipe = new SpScPipe(producer.getOutputPort(), collectorSink.getInputPort());
IPipe pipe = new BoundedSynchedPipe(producer.getOutputPort(), collectorSink.getInputPort());
registerCustomPipe((AbstractPipe<?>) pipe);
this.collectorSink = collectorSink;
......
......@@ -32,7 +32,7 @@ import teetime.framework.signal.TerminatingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.stage.basic.merger.Merger;
public class SpScPipeTest {
public class BoundedSynchedPipeTest {
// @Ignore
// ignore as long as this test passes null ports to SpScPipe
......@@ -41,7 +41,7 @@ public class SpScPipeTest {
Merger<Object> portSource = new Merger<Object>();
OutputPort<Object> sourcePort = portSource.getOutputPort();
InputPort<Object> targetPort = portSource.getNewInputPort();
AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
AbstractInterThreadPipe pipe = new BoundedSynchedPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method
List<ISignal> signals = new ArrayList<ISignal>();
signals.add(new StartingSignal());
......@@ -71,7 +71,7 @@ public class SpScPipeTest {
@Test(expected = IllegalArgumentException.class)
public void testAdd() throws Exception {
SpScPipe pipe = new SpScPipe(null, null, 4);
BoundedSynchedPipe pipe = new BoundedSynchedPipe(null, null, 4);
assertFalse(pipe.add(null));
}
}
......@@ -19,11 +19,11 @@ import static org.junit.Assert.assertFalse;
import org.junit.Test;
public class SingleElementPipeTest {
public class UnsynchedPipeTest {
@Test(expected = IllegalArgumentException.class)
public void testAdd() throws Exception {
SingleElementPipe pipe = new SingleElementPipe(null, null);
UnsynchedPipe pipe = new UnsynchedPipe(null, null);
assertFalse(pipe.add(null));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment