Skip to content
Snippets Groups Projects
Commit 54d45049 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

solved bugs reported in #256

parent 9a8f46b3
No related branches found
No related tags found
No related merge requests found
......@@ -22,12 +22,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.InstantiationPipe;
import teetime.framework.pipe.UnsynchedPipe;
import teetime.framework.pipe.BoundedSynchedPipe;
import teetime.framework.pipe.UnboundedSynchedPipe;
import teetime.framework.pipe.UnsynchedPipe;
/**
* Automatically instantiates the correct pipes
......@@ -67,12 +67,12 @@ class A3PipeInstantiation implements ITraverserVisitor {
if (targetStageThread != null && sourceStageThread != targetStageThread) {
// inter
if (pipe.capacity() != 0) {
new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (bounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
} else {
new BoundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort(), pipe.capacity());
new UnboundedSynchedPipe<T>(pipe.getSourcePort(), pipe.getTargetPort());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connected (unbounded) " + pipe.getSourcePort() + " and " + pipe.getTargetPort());
}
......
......@@ -66,7 +66,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
}
@Override
public final int capacity() {
public int capacity() {
return capacity;
}
......
......@@ -285,6 +285,9 @@ public abstract class AbstractStage {
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public void onStarting() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Stage " + getId() + " within thread " + getOwningThread().getId());
}
changeState(StageState.STARTED);
calledOnStarting = true;
}
......
......@@ -22,7 +22,7 @@ import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpScArrayQueue;
public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T>implements IMonitorablePipe {
public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T> implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
......@@ -59,6 +59,11 @@ public class BoundedSynchedPipe<T> extends AbstractSynchedPipe<T>implements IMon
return true;
}
@Override
public int capacity() {
return this.queue.capacity();
};
@Override
public boolean addNonBlocking(final Object element) {
return this.queue.offer(element);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment