Skip to content
Snippets Groups Projects
Commit 625f26fd authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

#192 working prototype

parent 05df0801
No related branches found
No related tags found
No related merge requests found
...@@ -32,11 +32,8 @@ public abstract class AbstractCompositeStage { ...@@ -32,11 +32,8 @@ public abstract class AbstractCompositeStage {
private final ConfigurationContext context; private final ConfigurationContext context;
public AbstractCompositeStage(final ConfigurationContext context) { public AbstractCompositeStage() {
if (null == context) { this.context = new ConfigurationContext();
throw new IllegalArgumentException("Context may not be null.");
}
this.context = context;
} }
protected ConfigurationContext getContext() { protected ConfigurationContext getContext() {
... ...
......
...@@ -25,8 +25,4 @@ package teetime.framework; ...@@ -25,8 +25,4 @@ package teetime.framework;
*/ */
public abstract class Configuration extends AbstractCompositeStage { public abstract class Configuration extends AbstractCompositeStage {
public Configuration() {
super(new ConfigurationContext());
}
} }
...@@ -29,7 +29,7 @@ import teetime.framework.pipe.InstantiationPipe; ...@@ -29,7 +29,7 @@ import teetime.framework.pipe.InstantiationPipe;
* *
* @since 2.0 * @since 2.0
*/ */
public final class ConfigurationContext { public class ConfigurationContext {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
...@@ -63,7 +63,21 @@ public final class ConfigurationContext { ...@@ -63,7 +63,21 @@ public final class ConfigurationContext {
LOGGER.warn("Overwriting existing pipe while connecting stages " + LOGGER.warn("Overwriting existing pipe while connecting stages " +
sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + "."); sourcePort.getOwningStage().getId() + " and " + targetPort.getOwningStage().getId() + ".");
} }
mergeContexts(sourcePort.getOwningStage(), targetPort.getOwningStage());
new InstantiationPipe(sourcePort, targetPort, capacity); new InstantiationPipe(sourcePort, targetPort, capacity);
} }
final void mergeContexts(final Stage sourceStage, final Stage targetStage) {
if (!sourceStage.owningContext.equals(EmptyContext.getInstance())) {
this.threadableStages.putAll(sourceStage.owningContext.threadableStages);
} else {
sourceStage.owningContext = this;
}
if (!targetStage.owningContext.equals(EmptyContext.getInstance())) {
this.threadableStages.putAll(targetStage.owningContext.threadableStages);
} else {
targetStage.owningContext = this;
}
}
} }
package teetime.framework;
class EmptyContext extends ConfigurationContext {
private static final EmptyContext INSTANCE = new EmptyContext();
private EmptyContext() {
}
static EmptyContext getInstance() {
return EmptyContext.INSTANCE;
}
}
...@@ -48,6 +48,8 @@ public abstract class Stage { ...@@ -48,6 +48,8 @@ public abstract class Stage {
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */ /** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
protected Thread owningThread; protected Thread owningThread;
protected ConfigurationContext owningContext = EmptyContext.getInstance();
protected Stage() { protected Stage() {
this.id = this.createId(); this.id = this.createId();
this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id);
... ...
......
...@@ -33,7 +33,6 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage { ...@@ -33,7 +33,6 @@ public final class EveryXthPrinter<T> extends AbstractCompositeStage {
private final List<Stage> lastStages = new ArrayList<Stage>(); private final List<Stage> lastStages = new ArrayList<Stage>();
public EveryXthPrinter(final int threshold, final ConfigurationContext context) { public EveryXthPrinter(final int threshold, final ConfigurationContext context) {
super(context);
distributor = new Distributor<T>(new CopyByReferenceStrategy()); distributor = new Distributor<T>(new CopyByReferenceStrategy());
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<Integer> printer = new Printer<Integer>(); Printer<Integer> printer = new Printer<Integer>();
... ...
......
...@@ -37,7 +37,6 @@ public final class WordCounter extends AbstractCompositeStage { ...@@ -37,7 +37,6 @@ public final class WordCounter extends AbstractCompositeStage {
private final MappingCounter<String> mapCounter; private final MappingCounter<String> mapCounter;
public WordCounter(final ConfigurationContext context) { public WordCounter(final ConfigurationContext context) {
super(context);
this.tokenizer = new Tokenizer(" "); this.tokenizer = new Tokenizer(" ");
final ToLowerCase toLowerCase = new ToLowerCase(); final ToLowerCase toLowerCase = new ToLowerCase();
... ...
......
wiki @ 709c839c
Subproject commit 709c839c447a50c93b37fcc633a01297115d4823
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment