Skip to content
Snippets Groups Projects
Commit 792a4d3b authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge branch 'InitSignal-final' into 'master'

Init signal final

#154
Introduced semaphores in RunnableProducerStage which let it wait for Initializing- and StartingSignal.
The analysis starts all threads instantly in init() and sends an InitializingSignal.
All producer wait until they all get triggered.

See merge request !40
parents f42d157e f01000b1
No related branches found
No related tags found
No related merge requests found
Showing
with 152 additions and 30 deletions
......@@ -6,7 +6,9 @@
</properties>
<body>
<release version="Snapshot" date="Daily basis" description="Unstable preview of oncoming versions">
<action dev="ntd" type="add" issue="154">
All stages will be initialized before starting the analysis.
</action>
</release>
......
......@@ -24,6 +24,7 @@ import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal;
import teetime.util.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy;
......@@ -69,6 +70,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override
public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal");
}
cachedTargetStage.onSignal(signal, getTargetPort());
}
@Override
public final boolean isClosed() {
return isClosed;
......
......@@ -56,4 +56,10 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
public void waitForStartSignal() throws InterruptedException {
// do nothing
}
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void waitForInitializingSignal() throws InterruptedException {
// do nothing
}
}
......@@ -25,7 +25,7 @@ abstract class AbstractRunnableStage implements Runnable {
private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: ";
private final Stage stage;
protected final Stage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
......@@ -39,16 +39,16 @@ abstract class AbstractRunnableStage implements Runnable {
this.logger.debug("Executing runnable stage...");
boolean failed = false;
try {
beforeStageExecution(stage);
beforeStageExecution();
try {
do {
executeStage(stage);
executeStage();
} while (!stage.shouldBeTerminated());
} catch (StageException e) {
this.stage.terminate();
failed = true;
}
afterStageExecution(stage);
afterStageExecution();
} catch (RuntimeException e) {
this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
......@@ -72,10 +72,10 @@ abstract class AbstractRunnableStage implements Runnable {
}
protected abstract void beforeStageExecution(Stage stage) throws InterruptedException;
protected abstract void beforeStageExecution() throws InterruptedException;
protected abstract void executeStage(Stage stage);
protected abstract void executeStage();
protected abstract void afterStageExecution(Stage stage);
protected abstract void afterStageExecution();
}
......@@ -59,7 +59,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>();
private boolean initialized;
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
/**
* Creates a new {@link Analysis} that skips validating the port connections and uses the default listener.
......@@ -116,10 +116,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
*
*/
private final void init() {
if (initialized) {
return;
}
initialized = true;
final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
if (threadableStageJobs.isEmpty()) {
......@@ -139,6 +135,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
......@@ -147,6 +144,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
......@@ -161,6 +159,12 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
initializeIntraStages(intraStages, thread, newListener);
}
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendInitializingSignal();
}
private Thread createThread(final AbstractRunnableStage runnable, final String name) {
......@@ -249,9 +253,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
* @since 1.1
*/
public void executeNonBlocking() {
startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads);
sendStartingSignal();
}
private void startThreads(final Iterable<Thread> threads) {
......@@ -260,6 +262,18 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
}
}
private void sendInitializingSignal() {
for (RunnableProducerStage runnable : producerRunnables) {
runnable.triggerInitializingSignal();
}
}
private void sendStartingSignal() {
for (RunnableProducerStage runnable : producerRunnables) {
runnable.triggerStartingSignal();
}
}
/**
* Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis
*
......
......@@ -38,4 +38,8 @@ public final class InputPort<T> extends AbstractPort<T> {
pipe.waitForStartSignal();
}
public void waitForInitializingSignal() throws InterruptedException {
pipe.waitForInitializingSignal();
};
}
......@@ -42,19 +42,19 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@SuppressWarnings("PMD.GuardLogStatement")
@Override
protected void beforeStageExecution(final Stage stage) throws InterruptedException {
logger.trace("Waiting for start signals..." + inputPorts);
protected void beforeStageExecution() throws InterruptedException {
logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
inputPort.waitForInitializingSignal();
}
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
}
logger.trace("Starting..." + stage);
logger.trace("Starting... " + stage);
}
@Override
protected void executeStage(final Stage stage) {
protected void executeStage() {
try {
stage.executeStage();
} catch (NotEnoughInputException e) {
......@@ -73,7 +73,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
@Override
protected void afterStageExecution(final Stage stage) {
protected void afterStageExecution() {
final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) {
stage.onSignal(signal, inputPort);
......
......@@ -15,30 +15,53 @@
*/
package teetime.framework;
import java.util.concurrent.Semaphore;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
public final class RunnableProducerStage extends AbstractRunnableStage {
final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0);
public RunnableProducerStage(final Stage stage) {
super(stage);
}
@Override
protected void beforeStageExecution(final Stage stage) {
final StartingSignal startingSignal = new StartingSignal();
stage.onSignal(startingSignal, null);
protected void beforeStageExecution() throws InterruptedException {
waitForInitializingSignal();
this.stage.onSignal(new InitializingSignal(), null);
waitForStartingSignal();
this.stage.onSignal(new StartingSignal(), null);
}
@Override
protected void executeStage(final Stage stage) {
stage.executeStage();
protected void executeStage() {
this.stage.executeStage();
}
@Override
protected void afterStageExecution(final Stage stage) {
protected void afterStageExecution() {
final TerminatingSignal terminatingSignal = new TerminatingSignal();
stage.onSignal(terminatingSignal, null);
this.stage.onSignal(terminatingSignal, null);
}
public void triggerInitializingSignal() {
initSemaphore.release();
}
public void triggerStartingSignal() {
startSemaphore.release();
}
public void waitForInitializingSignal() throws InterruptedException {
initSemaphore.acquire();
}
public void waitForStartingSignal() throws InterruptedException {
startSemaphore.acquire();
}
}
......@@ -86,6 +86,11 @@ public final class DummyPipe implements IPipe {
}
@Override
public void waitForInitializingSignal() throws InterruptedException {
}
@Override
public void close() {
......
......@@ -92,6 +92,8 @@ public interface IPipe {
void waitForStartSignal() throws InterruptedException;
void waitForInitializingSignal() throws InterruptedException;
void close();
}
package teetime.framework;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
public class RunnableProducerStageTest {
@Test
public void testInit() {
RunnableTestStage testStage = new RunnableTestStage();
RunnableProducerStage runnable = new RunnableProducerStage(testStage);
Thread thread = new Thread(runnable);
thread.start();
// Not running and not initialized
assertFalse(testStage.executed && testStage.initialized);
runnable.triggerInitializingSignal();
// Not running, but initialized
assertFalse(testStage.executed && !testStage.initialized);
runnable.triggerStartingSignal();
while (!testStage.shouldBeTerminated()) {
Thread.yield();
}
assertTrue(testStage.executed);
}
}
package teetime.framework;
class RunnableTestStage extends AbstractProducerStage<Object> {
boolean executed, initialized;
@Override
protected void executeStage() {
executed = true;
this.terminate();
}
@Override
protected void execute() {
}
@Override
public void onInitializing() throws Exception {
super.onInitializing();
initialized = true;
}
}
......@@ -102,6 +102,9 @@ class MergerTestingPipe implements IPipe {
@Override
public void waitForStartSignal() throws InterruptedException {}
@Override
public void waitForInitializingSignal() throws InterruptedException {}
@Override
public void close() {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment