Skip to content
Snippets Groups Projects
Commit 1c315ed8 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

added waitForInitializingSignal method

parent 84e5a198
No related branches found
No related tags found
No related merge requests found
...@@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering; ...@@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
...@@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
cachedTargetStage.onSignal(signal, getTargetPort()); cachedTargetStage.onSignal(signal, getTargetPort());
} }
@Override
public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal");
}
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override @Override
public final boolean isClosed() { public final boolean isClosed() {
return isClosed; return isClosed;
......
...@@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { ...@@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
public void waitForStartSignal() throws InterruptedException { public void waitForStartSignal() throws InterruptedException {
// do nothing // do nothing
} }
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void waitForInitializingSignal() throws InterruptedException {
// do nothing
}
} }
...@@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> { ...@@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> {
pipe.waitForStartSignal(); pipe.waitForStartSignal();
} }
public void waitForInitializingSignal() throws InterruptedException {
pipe.waitForInitializingSignal();
};
} }
...@@ -43,14 +43,14 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -43,14 +43,14 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@SuppressWarnings("PMD.GuardLogStatement") @SuppressWarnings("PMD.GuardLogStatement")
@Override @Override
protected void beforeStageExecution(final Stage stage) throws InterruptedException { protected void beforeStageExecution(final Stage stage) throws InterruptedException {
logger.trace("Waiting for start signals..." + inputPorts); logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal(); inputPort.waitForInitializingSignal();
} }
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal(); inputPort.waitForStartSignal();
} }
logger.trace("Starting..." + stage); logger.trace("Starting... " + stage);
} }
@Override @Override
......
...@@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe { ...@@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe {
} }
@Override
public void waitForInitializingSignal() throws InterruptedException {
}
@Override @Override
public void close() { public void close() {
......
...@@ -92,6 +92,8 @@ public interface IPipe { ...@@ -92,6 +92,8 @@ public interface IPipe {
void waitForStartSignal() throws InterruptedException; void waitForStartSignal() throws InterruptedException;
void waitForInitializingSignal() throws InterruptedException;
void close(); void close();
} }
...@@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe { ...@@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe {
@Override @Override
public void waitForStartSignal() throws InterruptedException {} public void waitForStartSignal() throws InterruptedException {}
@Override
public void waitForInitializingSignal() throws InterruptedException {}
@Override @Override
public void close() {} public void close() {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment