Skip to content
Snippets Groups Projects
Commit c89ddf01 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

#211 changed merging logic... first, contexts are collected in a

parent-child manner and afterwards merged on init
parent 68a51f91
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,14 @@ package teetime.framework; ...@@ -12,6 +12,14 @@ package teetime.framework;
*/ */
public abstract class AbstractService<T> { public abstract class AbstractService<T> {
abstract void merge(T target, T source); abstract void initialize();
abstract void start();
abstract void terminate();
abstract void finish();
abstract void merge(T source);
} }
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
*/ */
package teetime.framework; package teetime.framework;
import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
/** /**
* Represents a configuration of connected stages. Available to be extended. * Represents a configuration of connected stages. Available to be extended.
* *
...@@ -23,10 +26,12 @@ package teetime.framework; ...@@ -23,10 +26,12 @@ package teetime.framework;
* @since 2.0 * @since 2.0
* *
*/ */
public class Configuration extends AbstractCompositeStage { public abstract class Configuration extends AbstractCompositeStage {
private boolean executed; private boolean executed;
private final IExceptionListenerFactory factory;
boolean isExecuted() { boolean isExecuted() {
return executed; return executed;
} }
...@@ -35,7 +40,15 @@ public class Configuration extends AbstractCompositeStage { ...@@ -35,7 +40,15 @@ public class Configuration extends AbstractCompositeStage {
this.executed = executed; this.executed = executed;
} }
public IExceptionListenerFactory getFactory() {
return factory;
}
protected Configuration() { protected Configuration() {
// protected ctor to prevent direct instantiation. this(new TerminatingExceptionListenerFactory());
}
protected Configuration(final IExceptionListenerFactory factory) {
this.factory = factory;
} }
} }
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -35,6 +37,7 @@ final class ConfigurationContext { ...@@ -35,6 +37,7 @@ final class ConfigurationContext {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
private ThreadService runtimeService = new ThreadService(); private ThreadService runtimeService = new ThreadService();
private final List<ConfigurationContext> childs = new ArrayList<ConfigurationContext>(); // parent-child-tree
ConfigurationContext() {} ConfigurationContext() {}
...@@ -46,7 +49,7 @@ final class ConfigurationContext { ...@@ -46,7 +49,7 @@ final class ConfigurationContext {
* @see AbstractCompositeStage#addThreadableStage(Stage) * @see AbstractCompositeStage#addThreadableStage(Stage)
*/ */
final void addThreadableStage(final Stage stage, final String threadName) { final void addThreadableStage(final Stage stage, final String threadName) {
mergeContexts(stage); childFunction(stage);
runtimeService.addThreadableStage(stage, threadName); runtimeService.addThreadableStage(stage, threadName);
} }
...@@ -63,16 +66,18 @@ final class ConfigurationContext { ...@@ -63,16 +66,18 @@ final class ConfigurationContext {
LOGGER.warn("Overwriting existing pipe while connecting stages " + LOGGER.warn("Overwriting existing pipe while connecting stages " +
sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + ".");
} }
mergeContexts(sourcePort.getOwningStage()); childFunction(sourcePort.getOwningStage());
mergeContexts(targetPort.getOwningStage()); childFunction(targetPort.getOwningStage());
new InstantiationPipe(sourcePort, targetPort, capacity); new InstantiationPipe(sourcePort, targetPort, capacity);
} }
final void mergeContexts(final Stage stage) { // FIXME: Rename method
final void childFunction(final Stage stage) {
if (!stage.owningContext.equals(EMPTY_CONTEXT)) { if (!stage.owningContext.equals(EMPTY_CONTEXT)) {
if (stage.owningContext != this) { // Performance if (stage.owningContext != this) { // Performance
this.runtimeService.getThreadableStages().putAll(stage.owningContext.getRuntimeService().getThreadableStages()); // this.runtimeService.getThreadableStages().putAll(stage.owningContext.getRuntimeService().getThreadableStages());
stage.owningContext.getRuntimeService().setThreadableStages(this.getRuntimeService().getThreadableStages()); // stage.owningContext.getRuntimeService().setThreadableStages(this.getRuntimeService().getThreadableStages());
childs.add(stage.owningContext);
} }
} else { } else {
stage.owningContext = this; stage.owningContext = this;
...@@ -80,6 +85,21 @@ final class ConfigurationContext { ...@@ -80,6 +85,21 @@ final class ConfigurationContext {
} }
final void finalizeContext() {
for (ConfigurationContext child : childs) {
child.finalizeContext();
mergeContexts(child);
}
}
final void initializeServices() {
runtimeService.initialize();
}
private void mergeContexts(final ConfigurationContext child) {
runtimeService.merge(child.getRuntimeService());
}
public ThreadService getRuntimeService() { public ThreadService getRuntimeService() {
return runtimeService; return runtimeService;
} }
......
...@@ -16,12 +16,10 @@ ...@@ -16,12 +16,10 @@
package teetime.framework; package teetime.framework;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
...@@ -131,32 +129,8 @@ public final class Execution<T extends Configuration> { ...@@ -131,32 +129,8 @@ public final class Execution<T extends Configuration> {
ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext());
executionInstantiation.instantiatePipes(); executionInstantiation.instantiatePipes();
final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet(); getConfiguration().getContext().finalizeContext();
if (threadableStageJobs.isEmpty()) { getConfiguration().getContext().initializeServices();
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
}
for (Stage stage : threadableStageJobs) {
final Thread thread = initializeThreadableStages(stage);
final Set<Stage> intraStages = traverseIntraStages(stage);
final AbstractExceptionListener newListener = factory.createInstance();
initializeIntraStages(intraStages, thread, newListener);
}
getConfiguration().getContext().getRuntimeService().startThreads();
}
private Thread initializeThreadableStages(final Stage stage) {
return configuration.getContext().getRuntimeService().initializeThreadableStages(stage);
}
private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) {
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
}
} }
/** /**
...@@ -211,12 +185,6 @@ public final class Execution<T extends Configuration> { ...@@ -211,12 +185,6 @@ public final class Execution<T extends Configuration> {
return this.configuration; return this.configuration;
} }
private Set<Stage> traverseIntraStages(final Stage stage) {
final Traversor traversor = new Traversor(new IntraStageCollector());
traversor.traverse(stage);
return traversor.getVisitedStage();
}
/** /**
* @return * @return
* the given ExceptionListenerFactory instance * the given ExceptionListenerFactory instance
......
...@@ -5,11 +5,14 @@ import java.util.HashMap; ...@@ -5,11 +5,14 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.util.ThreadThrowableContainer; import teetime.util.ThreadThrowableContainer;
import teetime.util.framework.concurrent.SignalingCounter; import teetime.util.framework.concurrent.SignalingCounter;
...@@ -41,7 +44,33 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -41,7 +44,33 @@ class ThreadService extends AbstractService<ThreadService> {
private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>(); private final List<RunnableProducerStage> producerRunnables = new LinkedList<RunnableProducerStage>();
Thread initializeThreadableStages(final Stage stage) { @Override
void initialize() {
if (threadableStages.isEmpty()) {
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
}
for (Stage stage : threadableStages.keySet()) {
final Thread thread = initialize(stage);
final Set<Stage> intraStages = traverseIntraStages(stage);
// FIXME: receive factory from config!
final AbstractExceptionListener newListener = new TerminatingExceptionListenerFactory().createInstance();
initializeIntraStages(intraStages, thread, newListener);
}
start();
}
private void initializeIntraStages(final Set<Stage> intraStages, final Thread thread, final AbstractExceptionListener newListener) {
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
}
}
private Thread initialize(final Stage stage) {
final Thread thread; final Thread thread;
final TerminationStrategy terminationStrategy = stage.getTerminationStrategy(); final TerminationStrategy terminationStrategy = stage.getTerminationStrategy();
...@@ -82,6 +111,12 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -82,6 +111,12 @@ class ThreadService extends AbstractService<ThreadService> {
return thread; return thread;
} }
private Set<Stage> traverseIntraStages(final Stage stage) {
final Traversor traversor = new Traversor(new IntraStageCollector());
traversor.traverse(stage);
return traversor.getVisitedStage();
}
void addThreadableStage(final Stage stage, final String threadName) { void addThreadableStage(final Stage stage, final String threadName) {
if (this.threadableStages.put(stage, threadName) != null) { if (this.threadableStages.put(stage, threadName) != null) {
LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
...@@ -126,7 +161,8 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -126,7 +161,8 @@ class ThreadService extends AbstractService<ThreadService> {
sendStartingSignal(); sendStartingSignal();
} }
void startThreads() { @Override
void start() {
startThreads(this.consumerThreads); startThreads(this.consumerThreads);
startThreads(this.finiteProducerThreads); startThreads(this.finiteProducerThreads);
startThreads(this.infiniteProducerThreads); startThreads(this.infiniteProducerThreads);
...@@ -161,7 +197,19 @@ class ThreadService extends AbstractService<ThreadService> { ...@@ -161,7 +197,19 @@ class ThreadService extends AbstractService<ThreadService> {
} }
@Override @Override
void merge(final ThreadService target, final ThreadService source) { void merge(final ThreadService source) {
this.getThreadableStages().putAll(source.getThreadableStages());
source.setThreadableStages(this.getThreadableStages());
}
@Override
void terminate() {
// TODO Auto-generated method stub
}
@Override
void finish() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment