diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ea357146a290b441c56b5b2a3764e1a5b41c8b74..698af379f62df4e56eff0380a03f01064618d385 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -29,6 +29,7 @@ import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.signal.InitializingSignal; +import teetime.framework.signal.StartingSignal; import teetime.framework.signal.ValidatingSignal; import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; @@ -59,7 +60,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); - private boolean initialized; + private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); /** * Creates a new {@link Analysis} that skips validating the port connections and uses the default listener. @@ -116,10 +117,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * */ private final void init() { - if (initialized) { - return; - } - initialized = true; final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); if (threadableStageJobs.isEmpty()) { @@ -139,6 +136,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_SELF_DECISION: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); this.finiteProducerThreads.add(thread); InitializingSignal initializingSignal = new InitializingSignal(); @@ -147,6 +145,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); + producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); InitializingSignal initializingSignal = new InitializingSignal(); stage.onSignal(initializingSignal, null); @@ -161,6 +160,12 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught initializeIntraStages(intraStages, thread, newListener); } + startThreads(this.consumerThreads); + startThreads(this.finiteProducerThreads); + startThreads(this.infiniteProducerThreads); + + sendInitializingSignal(); + } private Thread createThread(final AbstractRunnableStage runnable, final String name) { @@ -250,9 +255,7 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * @since 1.1 */ public void executeNonBlocking() { - startThreads(this.consumerThreads); - startThreads(this.finiteProducerThreads); - startThreads(this.infiniteProducerThreads); + sendStartingSignal(); } private void startThreads(final Iterable<Thread> threads) { @@ -261,6 +264,20 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } } + private void sendInitializingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + InitializingSignal signal = new InitializingSignal(); + runnable.initializeProducer(signal); + } + } + + private void sendStartingSignal() { + for (RunnableProducerStage runnable : producerRunnables) { + StartingSignal signal = new StartingSignal(); + runnable.startProducer(signal); + } + } + /** * Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis * diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java index 38a72f462b9c5476f52fcbfa0aef60285fde2d72..4c6c9c7cb9fe478544e8df6fb0cec1bbf652d73d 100644 --- a/src/main/java/teetime/framework/RunnableProducerStage.java +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -15,19 +15,22 @@ */ package teetime.framework; +import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.StartingSignal; import teetime.framework.signal.TerminatingSignal; final class RunnableProducerStage extends AbstractRunnableStage { + private boolean initArrived, startArrived; + public RunnableProducerStage(final Stage stage) { super(stage); } @Override protected void beforeStageExecution() { - final StartingSignal startingSignal = new StartingSignal(); - this.stage.onSignal(startingSignal, null); + waitForInitializingSignal(); + waitForStartingSignal(); } @Override @@ -41,4 +44,25 @@ final class RunnableProducerStage extends AbstractRunnableStage { this.stage.onSignal(terminatingSignal, null); } + public void initializeProducer(final InitializingSignal signal) { + this.stage.onSignal(signal, null); + initArrived = true; + } + + public void startProducer(final StartingSignal signal) { + this.stage.onSignal(signal, null); + startArrived = true; + } + + public void waitForInitializingSignal() { + while (!initArrived) { + Thread.yield(); + } + } + + public void waitForStartingSignal() { + while (!startArrived) { + Thread.yield(); + } + } }