Skip to content
Snippets Groups Projects
Commit ae893a07 authored by Christian Wulf's avatar Christian Wulf
Browse files

added onInitializing() to Stage

parent 0cb01562
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,8 @@ import teetime.framework.validation.InvalidPortConnection; ...@@ -35,6 +35,8 @@ import teetime.framework.validation.InvalidPortConnection;
* @since 1.1 * @since 1.1
* @author Christian Wulf, Nelson Tavares de Sousa * @author Christian Wulf, Nelson Tavares de Sousa
* *
* @deprecated This concept is not yet implemented in a correct way. As soon as the concept is stable, we will remove the deprecated tag.
*
*/ */
@Deprecated @Deprecated
public abstract class AbstractCompositeStage extends Stage { public abstract class AbstractCompositeStage extends Stage {
...@@ -119,6 +121,11 @@ public abstract class AbstractCompositeStage extends Stage { ...@@ -119,6 +121,11 @@ public abstract class AbstractCompositeStage extends Stage {
return getFirstStage().getOwningThread(); return getFirstStage().getOwningThread();
} }
@Override
public final void onInitializing() throws Exception {
getFirstStage().onInitializing();
}
@Override @Override
public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) { public final void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
getFirstStage().onValidating(invalidPortConnections); getFirstStage().onValidating(invalidPortConnections);
......
...@@ -87,18 +87,23 @@ public abstract class AbstractStage extends Stage { ...@@ -87,18 +87,23 @@ public abstract class AbstractStage extends Stage {
} }
} }
@Override
public void onInitializing() throws Exception {
this.connectUnconnectedOutputPorts();
currentState = StageState.INITIALIZED;
logger.trace("Initialized.");
}
@Override @Override
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections); this.validateOutputPorts(invalidPortConnections);
currentState = StageState.VALIDATED; currentState = StageState.VALIDATED;
logger.trace("Validated.");
} }
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
@Override @Override
public void onStarting() throws Exception { public void onStarting() throws Exception {
this.owningThread = Thread.currentThread();
this.connectUnconnectedOutputPorts();
currentState = StageState.STARTED; currentState = StageState.STARTED;
logger.trace("Started."); logger.trace("Started.");
} }
...@@ -183,13 +188,12 @@ public abstract class AbstractStage extends Stage { ...@@ -183,13 +188,12 @@ public abstract class AbstractStage extends Stage {
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (OutputPort<?> outputPort : outputPorts) { for (OutputPort<?> outputPort : outputPorts) {
final IPipe pipe = outputPort.getPipe(); final IPipe pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
final Class<?> sourcePortType = outputPort.getType(); final Class<?> sourcePortType = outputPort.getType();
final Class<?> targetPortType = pipe.getTargetPort().getType(); final Class<?> targetPortType = pipe.getTargetPort().getType();
if (null == sourcePortType || !sourcePortType.equals(targetPortType)) { if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort()); final InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
invalidPortConnections.add(invalidPortConnection); invalidPortConnections.add(invalidPortConnection);
}
} }
} }
} }
......
...@@ -130,43 +130,56 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -130,43 +130,56 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
} }
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
Set<Stage> intraStages = traverseIntraStages(stage); final Thread thread;
AbstractExceptionListener newListener = factory.createInstance();
switch (stage.getTerminationStrategy()) { final TerminationStrategy terminationStrategy = stage.getTerminationStrategy();
switch (terminationStrategy) {
case BY_SIGNAL: { case BY_SIGNAL: {
final RunnableConsumerStage runnable = new RunnableConsumerStage(stage); final RunnableConsumerStage runnable = new RunnableConsumerStage(stage);
final Thread thread = createThread(newListener, intraStages, stage, runnable); thread = createThread(runnable, stage.getId());
this.consumerThreads.add(thread); this.consumerThreads.add(thread);
break; break;
} }
case BY_SELF_DECISION: { case BY_SELF_DECISION: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage); final RunnableProducerStage runnable = new RunnableProducerStage(stage);
final Thread thread = createThread(newListener, intraStages, stage, runnable); thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread); this.finiteProducerThreads.add(thread);
break; break;
} }
case BY_INTERRUPT: { case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage); final RunnableProducerStage runnable = new RunnableProducerStage(stage);
final Thread thread = createThread(newListener, intraStages, stage, runnable); thread = createThread(runnable, stage.getId());
this.infiniteProducerThreads.add(thread); this.infiniteProducerThreads.add(thread);
break; break;
} }
default: default:
break; throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy);
} }
final Set<Stage> intraStages = traverseIntraStages(stage);
final AbstractExceptionListener newListener = factory.createInstance();
initializeIntraStages(intraStages, thread, newListener);
} }
} }
private Thread createThread(final AbstractExceptionListener newListener, final Set<Stage> intraStages, final Stage stage, final AbstractRunnableStage runnable) { private Thread createThread(final AbstractRunnableStage runnable, final String name) {
final Thread thread = new Thread(runnable); final Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(this);
thread.setName(name);
return thread;
}
private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) {
for (Stage intraStage : intraStages) { for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread); intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener); intraStage.setExceptionHandler(newListener);
try {
intraStage.onInitializing();
} catch (Exception e) { // NOPMD(generic framework catch)
throw new IllegalStateException("The following exception occurs within initializing the analysis:", e);
}
} }
thread.setUncaughtExceptionHandler(this);
thread.setName(stage.getId());
return thread;
} }
/** /**
......
...@@ -128,6 +128,13 @@ public abstract class Stage { ...@@ -128,6 +128,13 @@ public abstract class Stage {
public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections); public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections);
/**
* Event that is triggered within the initialization phase of the analysis.
* It does not count to the execution time.
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onInitializing() throws Exception;
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onStarting() throws Exception; public abstract void onStarting() throws Exception;
...@@ -137,4 +144,5 @@ public abstract class Stage { ...@@ -137,4 +144,5 @@ public abstract class Stage {
protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) { protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
this.exceptionHandler = exceptionHandler; this.exceptionHandler = exceptionHandler;
} }
} }
...@@ -17,6 +17,7 @@ package teetime.framework; ...@@ -17,6 +17,7 @@ package teetime.framework;
public enum StageState { public enum StageState {
INITIALIZED,
CREATED, CREATED,
VALIDATING, VALIDATED, VALIDATING, VALIDATED,
STARTING, STARTED, STARTING, STARTED,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment