diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 7af5b4d6f24a3dc28b1bf7d6bbfc76d2b910c6ea..a23ae092f69297a93fc8648d627d20dcab04b966 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -30,18 +30,15 @@ final class RunnableConsumerStage extends AbstractRunnableStage { super(stage); } - @SuppressWarnings("PMD.GuardLogStatement") @Override protected void beforeStageExecution() throws InterruptedException { - logger.trace("Waiting for init signals... " + stage); for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForInitializingSignal(); } - logger.trace("Waiting for start signals... " + stage); + for (InputPort<?> inputPort : stage.getInputPorts()) { inputPort.waitForStartSignal(); } - logger.trace("Starting... " + stage); } @Override diff --git a/src/main/java/teetime/framework/TerminationStrategy.java b/src/main/java/teetime/framework/TerminationStrategy.java index 9d56708c6d08ab40c8a0002ebb62df4872cebcf6..74a6e337c5572d161fd38ca05b68391ed4cea63a 100644 --- a/src/main/java/teetime/framework/TerminationStrategy.java +++ b/src/main/java/teetime/framework/TerminationStrategy.java @@ -16,5 +16,7 @@ package teetime.framework; public enum TerminationStrategy { + BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT + } diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index 985cdf0a1032e1f46337964b71f46c6bd9eadc5b..aca4c7b340ec5a46df61bad43243a9dce5cdd1b6 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory; import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; -import teetime.framework.signal.InitializingSignal; import teetime.util.ThreadThrowableContainer; import teetime.util.framework.concurrent.SignalingCounter; @@ -150,16 +149,12 @@ class ThreadService extends AbstractService<ThreadService> { producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); this.finiteProducerThreads.add(thread); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); break; } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); producerRunnables.add(runnable); thread = createThread(runnable, stage.getId()); - InitializingSignal initializingSignal = new InitializingSignal(); - stage.onSignal(initializingSignal, null); this.infiniteProducerThreads.add(thread); break; } @@ -182,7 +177,7 @@ class ThreadService extends AbstractService<ThreadService> { } void addThreadableStage(final Stage stage, final String threadName) { - if (this.threadableStages.put(stage, threadName) != null) { + if (this.threadableStages.put(stage, threadName) != null && LOGGER.isWarnEnabled()) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); } }