Skip to content
Snippets Groups Projects
Commit 3d9505fb authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed double initialization

parent b500f87c
No related branches found
No related tags found
No related merge requests found
...@@ -30,18 +30,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -30,18 +30,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
super(stage); super(stage);
} }
@SuppressWarnings("PMD.GuardLogStatement")
@Override @Override
protected void beforeStageExecution() throws InterruptedException { protected void beforeStageExecution() throws InterruptedException {
logger.trace("Waiting for init signals... " + stage);
for (InputPort<?> inputPort : stage.getInputPorts()) { for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForInitializingSignal(); inputPort.waitForInitializingSignal();
} }
logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : stage.getInputPorts()) { for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForStartSignal(); inputPort.waitForStartSignal();
} }
logger.trace("Starting... " + stage);
} }
@Override @Override
......
...@@ -16,5 +16,7 @@ ...@@ -16,5 +16,7 @@
package teetime.framework; package teetime.framework;
public enum TerminationStrategy { public enum TerminationStrategy {
BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT
} }
...@@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory; ...@@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.util.ThreadThrowableContainer; import teetime.util.ThreadThrowableContainer;
import teetime.util.framework.concurrent.SignalingCounter; import teetime.util.framework.concurrent.SignalingCounter;
...@@ -150,16 +149,12 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -150,16 +149,12 @@ class ThreadService extends AbstractService<ThreadService> {
producerRunnables.add(runnable); producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId()); thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread); this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
break; break;
} }
case BY_INTERRUPT: { case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage); final RunnableProducerStage runnable = new RunnableProducerStage(stage);
producerRunnables.add(runnable); producerRunnables.add(runnable);
thread = createThread(runnable, stage.getId()); thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
this.infiniteProducerThreads.add(thread); this.infiniteProducerThreads.add(thread);
break; break;
} }
...@@ -182,7 +177,7 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -182,7 +177,7 @@ class ThreadService extends AbstractService<ThreadService> {
} }
void addThreadableStage(final Stage stage, final String threadName) { void addThreadableStage(final Stage stage, final String threadName) {
if (this.threadableStages.put(stage, threadName) != null) { if (this.threadableStages.put(stage, threadName) != null && LOGGER.isWarnEnabled()) {
LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment