diff --git a/src/main/java/teetime/framework/AbstractService.java b/src/main/java/teetime/framework/AbstractService.java index c617671effc38efc380342f271371d42e0baea76..4ac0200dab222c9ef0ea8a45e7edcaa33e5dd57a 100644 --- a/src/main/java/teetime/framework/AbstractService.java +++ b/src/main/java/teetime/framework/AbstractService.java @@ -12,6 +12,14 @@ package teetime.framework; */ public abstract class AbstractService<T> { - abstract void merge(T target, T source); + abstract void initialize(); + + abstract void start(); + + abstract void terminate(); + + abstract void finish(); + + abstract void merge(T source); } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 4a225fb22097c002aed3015b28c64f5cc950c0d4..a0bcfd9021b62779c8c2b6da24ec0151a66e001a 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -15,6 +15,9 @@ */ package teetime.framework; +import teetime.framework.exceptionHandling.IExceptionListenerFactory; +import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; + /** * Represents a configuration of connected stages. Available to be extended. * @@ -23,10 +26,12 @@ package teetime.framework; * @since 2.0 * */ -public class Configuration extends AbstractCompositeStage { +public abstract class Configuration extends AbstractCompositeStage { private boolean executed; + private final IExceptionListenerFactory factory; + boolean isExecuted() { return executed; } @@ -35,7 +40,15 @@ public class Configuration extends AbstractCompositeStage { this.executed = executed; } + public IExceptionListenerFactory getFactory() { + return factory; + } + protected Configuration() { - // protected ctor to prevent direct instantiation. + this(new TerminatingExceptionListenerFactory()); + } + + protected Configuration(final IExceptionListenerFactory factory) { + this.factory = factory; } } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index a1606ad66d260478ac431cd6750e41cba768f6cb..ff410c9d9162220e67277e676ff14580422b8f61 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -15,6 +15,8 @@ */ package teetime.framework; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -35,6 +37,7 @@ final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private ThreadService runtimeService = new ThreadService(); + private final List<ConfigurationContext> childs = new ArrayList<ConfigurationContext>(); // parent-child-tree ConfigurationContext() {} @@ -46,7 +49,7 @@ final class ConfigurationContext { * @see AbstractCompositeStage#addThreadableStage(Stage) */ final void addThreadableStage(final Stage stage, final String threadName) { - mergeContexts(stage); + childFunction(stage); runtimeService.addThreadableStage(stage, threadName); } @@ -63,16 +66,18 @@ final class ConfigurationContext { LOGGER.warn("Overwriting existing pipe while connecting stages " + sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); } - mergeContexts(sourcePort.getOwningStage()); - mergeContexts(targetPort.getOwningStage()); + childFunction(sourcePort.getOwningStage()); + childFunction(targetPort.getOwningStage()); new InstantiationPipe(sourcePort, targetPort, capacity); } - final void mergeContexts(final Stage stage) { + // FIXME: Rename method + final void childFunction(final Stage stage) { if (!stage.owningContext.equals(EMPTY_CONTEXT)) { if (stage.owningContext != this) { // Performance - this.runtimeService.getThreadableStages().putAll(stage.owningContext.getRuntimeService().getThreadableStages()); - stage.owningContext.getRuntimeService().setThreadableStages(this.getRuntimeService().getThreadableStages()); + // this.runtimeService.getThreadableStages().putAll(stage.owningContext.getRuntimeService().getThreadableStages()); + // stage.owningContext.getRuntimeService().setThreadableStages(this.getRuntimeService().getThreadableStages()); + childs.add(stage.owningContext); } } else { stage.owningContext = this; @@ -80,6 +85,21 @@ final class ConfigurationContext { } + final void finalizeContext() { + for (ConfigurationContext child : childs) { + child.finalizeContext(); + mergeContexts(child); + } + } + + final void initializeServices() { + runtimeService.initialize(); + } + + private void mergeContexts(final ConfigurationContext child) { + runtimeService.merge(child.getRuntimeService()); + } + public ThreadService getRuntimeService() { return runtimeService; } diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 61a80824790339cf4b311aaac19246464e541096..84396bd18ec1bcae7886a5e5b17acdad87aef6c0 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -16,12 +16,10 @@ package teetime.framework; import java.util.Map; -import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.framework.signal.ValidatingSignal; @@ -131,32 +129,8 @@ public final class Execution<T extends Configuration> { ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); executionInstantiation.instantiatePipes(); - final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet(); - if (threadableStageJobs.isEmpty()) { - throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); - } - - for (Stage stage : threadableStageJobs) { - final Thread thread = initializeThreadableStages(stage); - - final Set<Stage> intraStages = traverseIntraStages(stage); - final AbstractExceptionListener newListener = factory.createInstance(); - initializeIntraStages(intraStages, thread, newListener); - } - - getConfiguration().getContext().getRuntimeService().startThreads(); - - } - - private Thread initializeThreadableStages(final Stage stage) { - return configuration.getContext().getRuntimeService().initializeThreadableStages(stage); - } - - private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { - for (Stage intraStage : intraStages) { - intraStage.setOwningThread(thread); - intraStage.setExceptionHandler(newListener); - } + getConfiguration().getContext().finalizeContext(); + getConfiguration().getContext().initializeServices(); } /** @@ -211,12 +185,6 @@ public final class Execution<T extends Configuration> { return this.configuration; } - private Set<Stage> traverseIntraStages(final Stage stage) { - final Traversor traversor = new Traversor(new IntraStageCollector()); - traversor.traverse(stage); - return traversor.getVisitedStage(); - } - /** * @return * the given ExceptionListenerFactory instance diff --git a/src/main/java/teetime/framework/ThreadService.java b/src/main/java/teetime/framework/ThreadService.java index b58da114a0ee53edb1d3108d3c309b002b3054b0..aefe24a22e1bb68796859c907b5a30b8ac0dbe27 100644 --- a/src/main/java/teetime/framework/ThreadService.java +++ b/src/main/java/teetime/framework/ThreadService.java @@ -5,11 +5,14 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.framework.exceptionHandling.AbstractExceptionListener; +import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.framework.signal.InitializingSignal; import teetime.util.ThreadThrowableContainer; import teetime.util.framework.concurrent.SignalingCounter; @@ -41,7 +44,33 @@ class ThreadService extends AbstractService<ThreadService> { private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); - Thread initializeThreadableStages(final Stage stage) { + @Override + void initialize() { + if (threadableStages.isEmpty()) { + throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); + } + + for (Stage stage : threadableStages.keySet()) { + final Thread thread = initialize(stage); + + final Set<Stage> intraStages = traverseIntraStages(stage); + + // FIXME: receive factory from config! + final AbstractExceptionListener newListener = new TerminatingExceptionListenerFactory().createInstance(); + initializeIntraStages(intraStages, thread, newListener); + } + + start(); + } + + private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) { + for (Stage intraStage : intraStages) { + intraStage.setOwningThread(thread); + intraStage.setExceptionHandler(newListener); + } + } + + private Thread initialize(final Stage stage) { final Thread thread; final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); @@ -82,6 +111,12 @@ class ThreadService extends AbstractService<ThreadService> { return thread; } + private Set<Stage> traverseIntraStages(final Stage stage) { + final Traversor traversor = new Traversor(new IntraStageCollector()); + traversor.traverse(stage); + return traversor.getVisitedStage(); + } + void addThreadableStage(final Stage stage, final String threadName) { if (this.threadableStages.put(stage, threadName) != null) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); @@ -126,7 +161,8 @@ class ThreadService extends AbstractService<ThreadService> { sendStartingSignal(); } - void startThreads() { + @Override + void start() { startThreads(this.consumerThreads); startThreads(this.finiteProducerThreads); startThreads(this.infiniteProducerThreads); @@ -161,7 +197,19 @@ class ThreadService extends AbstractService<ThreadService> { } @Override - void merge(final ThreadService target, final ThreadService source) { + void merge(final ThreadService source) { + this.getThreadableStages().putAll(source.getThreadableStages()); + source.setThreadableStages(this.getThreadableStages()); + } + + @Override + void terminate() { + // TODO Auto-generated method stub + + } + + @Override + void finish() { // TODO Auto-generated method stub }