diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 4e65f378ba7aa881b436f855375c5b3ec0fb3c77..1167a87201072af56b9d7063970fcb00862fa35f 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -35,6 +35,8 @@ import teetime.framework.validation.InvalidPortConnection; * @since 1.1 * @author Christian Wulf, Nelson Tavares de Sousa * + * @deprecated This concept is not yet implemented in a correct way. As soon as the concept is stable, we will remove the deprecated tag. + * */ @Deprecated public abstract class AbstractCompositeStage extends Stage { @@ -119,6 +121,11 @@ public abstract class AbstractCompositeStage extends Stage { return getFirstStage().getOwningThread(); } + @Override + public final void onInitializing() throws Exception { + getFirstStage().onInitializing(); + } + @Override public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) { getFirstStage().onValidating(invalidPortConnections); diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index f2621622998c198bd69ffd3efe52a4fb581fe999..ab8d1dae94909a75c1254a40121c64392c78ad54 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -87,18 +87,23 @@ public abstract class AbstractStage extends Stage { } } + @Override + public void onInitializing() throws Exception { + this.connectUnconnectedOutputPorts(); + currentState = StageState.INITIALIZED; + logger.trace("Initialized."); + } + @Override public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { this.validateOutputPorts(invalidPortConnections); currentState = StageState.VALIDATED; + logger.trace("Validated."); } @SuppressWarnings("PMD.SignatureDeclareThrowsException") @Override public void onStarting() throws Exception { - this.owningThread = Thread.currentThread(); - - this.connectUnconnectedOutputPorts(); currentState = StageState.STARTED; logger.trace("Started."); } @@ -183,13 +188,12 @@ public abstract class AbstractStage extends Stage { public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : outputPorts) { final IPipe pipe = outputPort.getPipe(); - if (null != pipe) { // if output port is connected with another one - final Class<?> sourcePortType = outputPort.getType(); - final Class<?> targetPortType = pipe.getTargetPort().getType(); - if (null == sourcePortType || !sourcePortType.equals(targetPortType)) { - final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort()); - invalidPortConnections.add(invalidPortConnection); - } + + final Class<?> sourcePortType = outputPort.getType(); + final Class<?> targetPortType = pipe.getTargetPort().getType(); + if (null == sourcePortType || !sourcePortType.equals(targetPortType)) { + final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort()); + invalidPortConnections.add(invalidPortConnection); } } } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index a1773edb88f206853e05ea5d2aaa9fb2b28e45bd..42d141bdbe6d502d4ec84331bb9062ad024ac391 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -130,43 +130,56 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught } for (Stage stage : threadableStageJobs) { - Set<Stage> intraStages = traverseIntraStages(stage); - AbstractExceptionListener newListener = factory.createInstance(); - switch (stage.getTerminationStrategy()) { + final Thread thread; + + final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); + switch (terminationStrategy) { case BY_SIGNAL: { final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); - final Thread thread = createThread(newListener, intraStages, stage, runnable); + thread = createThread(runnable, stage.getId()); this.consumerThreads.add(thread); break; } case BY_SELF_DECISION: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); - final Thread thread = createThread(newListener, intraStages, stage, runnable); + thread = createThread(runnable, stage.getId()); this.finiteProducerThreads.add(thread); break; } case BY_INTERRUPT: { final RunnableProducerStage runnable = new RunnableProducerStage(stage); - final Thread thread = createThread(newListener, intraStages, stage, runnable); + thread = createThread(runnable, stage.getId()); this.infiniteProducerThreads.add(thread); break; } default: - break; + throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy); } + + final Set<Stage> intraStages = traverseIntraStages(stage); + final AbstractExceptionListener newListener = factory.createInstance(); + initializeIntraStages(intraStages, thread, newListener); } } - private Thread createThread(final AbstractExceptionListener newListener, final Set<Stage> intraStages, final Stage stage, final AbstractRunnableStage runnable) { + private Thread createThread(final AbstractRunnableStage runnable, final String name) { final Thread thread = new Thread(runnable); + thread.setUncaughtExceptionHandler(this); + thread.setName(name); + return thread; + } + + private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { for (Stage intraStage : intraStages) { intraStage.setOwningThread(thread); intraStage.setExceptionHandler(newListener); + try { + intraStage.onInitializing(); + } catch (Exception e) { // NOPMD(generic framework catch) + throw new IllegalStateException("The following exception occurs within initializing the analysis:", e); + } } - thread.setUncaughtExceptionHandler(this); - thread.setName(stage.getId()); - return thread; } /** diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index ab567e82c71e7ed2fde7ce84aa5786be93671c49..aff67ce4569d48bafb9a9bc8d2a8c47e626bff49 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -128,6 +128,13 @@ public abstract class Stage { public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); + /** + * Event that is triggered within the initialization phase of the analysis. + * It does not count to the execution time. + */ + @SuppressWarnings("PMD.SignatureDeclareThrowsException") + public abstract void onInitializing() throws Exception; + @SuppressWarnings("PMD.SignatureDeclareThrowsException") public abstract void onStarting() throws Exception; @@ -137,4 +144,5 @@ public abstract class Stage { protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { this.exceptionHandler = exceptionHandler; } + } diff --git a/src/main/java/teetime/framework/StageState.java b/src/main/java/teetime/framework/StageState.java index cb32ff6400432a05347f7591d0d3ef36daf745d8..02be6f57b8db1d93df2556c24a5c0a4023fbb7a0 100644 --- a/src/main/java/teetime/framework/StageState.java +++ b/src/main/java/teetime/framework/StageState.java @@ -17,6 +17,7 @@ package teetime.framework; public enum StageState { + INITIALIZED, CREATED, VALIDATING, VALIDATED, STARTING, STARTED,