Skip to content
Snippets Groups Projects
Commit 315f61b6 authored by Christian Wulf's avatar Christian Wulf
Browse files

removed Stage

parent 11de3429
No related branches found
No related tags found
No related merge requests found
Showing
with 217 additions and 286 deletions
...@@ -26,14 +26,14 @@ import teetime.framework.pipe.DummyPipe; ...@@ -26,14 +26,14 @@ import teetime.framework.pipe.DummyPipe;
*/ */
class A1ThreadableStageCollector implements ITraverserVisitor { class A1ThreadableStageCollector implements ITraverserVisitor {
private final Set<Stage> threadableStages = new HashSet<Stage>(); private final Set<AbstractStage> threadableStages = new HashSet<AbstractStage>();
public Set<Stage> getThreadableStages() { public Set<AbstractStage> getThreadableStages() {
return threadableStages; return threadableStages;
} }
@Override @Override
public VisitorBehavior visit(final Stage stage) { public VisitorBehavior visit(final AbstractStage stage) {
if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) { if (stage.getOwningThread() != null && !threadableStages.contains(stage) && stage.getCurrentState() == StageState.CREATED) {
threadableStages.add(stage); threadableStages.add(stage);
} }
......
...@@ -31,17 +31,17 @@ public class A2InvalidThreadAssignmentCheck { ...@@ -31,17 +31,17 @@ public class A2InvalidThreadAssignmentCheck {
private static final int DEFAULT_COLOR = 0; private static final int DEFAULT_COLOR = 0;
private final Set<Stage> threadableStages; private final Set<AbstractStage> threadableStages;
public A2InvalidThreadAssignmentCheck(final Set<Stage> threadableStages) { public A2InvalidThreadAssignmentCheck(final Set<AbstractStage> threadableStages) {
this.threadableStages = threadableStages; this.threadableStages = threadableStages;
} }
public void check() { public void check() {
int color = DEFAULT_COLOR; int color = DEFAULT_COLOR;
ObjectIntMap<Stage> colors = new ObjectIntHashMap<Stage>(); ObjectIntMap<AbstractStage> colors = new ObjectIntHashMap<AbstractStage>();
for (Stage threadableStage : threadableStages) { for (AbstractStage threadableStage : threadableStages) {
color++; color++;
colors.put(threadableStage, color); colors.put(threadableStage, color);
...@@ -53,11 +53,11 @@ public class A2InvalidThreadAssignmentCheck { ...@@ -53,11 +53,11 @@ public class A2InvalidThreadAssignmentCheck {
private static class ThreadPainter implements ITraverserVisitor { private static class ThreadPainter implements ITraverserVisitor {
private final ObjectIntMap<Stage> colors; private final ObjectIntMap<AbstractStage> colors;
private final int color; private final int color;
private final Set<Stage> threadableStages; private final Set<AbstractStage> threadableStages;
public ThreadPainter(final ObjectIntMap<Stage> colors, final int color, final Set<Stage> threadableStages) { public ThreadPainter(final ObjectIntMap<AbstractStage> colors, final int color, final Set<AbstractStage> threadableStages) {
super(); super();
this.colors = colors; this.colors = colors;
this.color = color; this.color = color;
...@@ -65,7 +65,7 @@ public class A2InvalidThreadAssignmentCheck { ...@@ -65,7 +65,7 @@ public class A2InvalidThreadAssignmentCheck {
} }
@Override @Override
public VisitorBehavior visit(final Stage stage) { public VisitorBehavior visit(final AbstractStage stage) {
return VisitorBehavior.CONTINUE; return VisitorBehavior.CONTINUE;
} }
...@@ -73,7 +73,7 @@ public class A2InvalidThreadAssignmentCheck { ...@@ -73,7 +73,7 @@ public class A2InvalidThreadAssignmentCheck {
public VisitorBehavior visit(final AbstractPort<?> port) { public VisitorBehavior visit(final AbstractPort<?> port) {
IPipe<?> pipe = port.getPipe(); IPipe<?> pipe = port.getPipe();
// FIXME line below requires FORWARD. should be independent of the used direction // FIXME line below requires FORWARD. should be independent of the used direction
Stage targetStage = pipe.getTargetPort().getOwningStage(); AbstractStage targetStage = pipe.getTargetPort().getOwningStage();
int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR;
......
...@@ -44,7 +44,7 @@ class A3PipeInstantiation implements ITraverserVisitor { ...@@ -44,7 +44,7 @@ class A3PipeInstantiation implements ITraverserVisitor {
private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>(); private final Set<IPipe<?>> visitedPipes = new HashSet<IPipe<?>>();
@Override @Override
public VisitorBehavior visit(final Stage stage) { public VisitorBehavior visit(final AbstractStage stage) {
return VisitorBehavior.CONTINUE; return VisitorBehavior.CONTINUE;
} }
......
...@@ -23,21 +23,21 @@ import java.util.Set; ...@@ -23,21 +23,21 @@ import java.util.Set;
class A4StageAttributeSetter { class A4StageAttributeSetter {
private final Configuration configuration; private final Configuration configuration;
private final Set<Stage> threadableStages; private final Set<AbstractStage> threadableStages;
public A4StageAttributeSetter(final Configuration configuration, final Set<Stage> threadableStages) { public A4StageAttributeSetter(final Configuration configuration, final Set<AbstractStage> threadableStages) {
super(); super();
this.configuration = configuration; this.configuration = configuration;
this.threadableStages = threadableStages; this.threadableStages = threadableStages;
} }
public void setAttributes() { public void setAttributes() {
for (Stage threadableStage : threadableStages) { for (AbstractStage threadableStage : threadableStages) {
setAttributes(threadableStage); setAttributes(threadableStage);
} }
} }
private void setAttributes(final Stage threadableStage) { private void setAttributes(final AbstractStage threadableStage) {
IntraStageCollector visitor = new IntraStageCollector(threadableStage); IntraStageCollector visitor = new IntraStageCollector(threadableStage);
Traverser traverser = new Traverser(visitor); Traverser traverser = new Traverser(visitor);
traverser.traverse(threadableStage); traverser.traverse(threadableStage);
...@@ -45,12 +45,12 @@ class A4StageAttributeSetter { ...@@ -45,12 +45,12 @@ class A4StageAttributeSetter {
setAttributes(threadableStage, traverser.getVisitedStages()); setAttributes(threadableStage, traverser.getVisitedStages());
} }
private void setAttributes(final Stage threadableStage, final Set<Stage> intraStages) { private void setAttributes(final AbstractStage threadableStage, final Set<AbstractStage> intraStages) {
threadableStage.setExceptionHandler(configuration.getFactory().createInstance(threadableStage.getOwningThread())); threadableStage.setExceptionHandler(configuration.getFactory().createInstance(threadableStage.getOwningThread()));
// threadableStage.setOwningThread(owningThread); // threadableStage.setOwningThread(owningThread);
threadableStage.setOwningContext(configuration.getContext()); threadableStage.setOwningContext(configuration.getContext());
for (Stage stage : intraStages) { for (AbstractStage stage : intraStages) {
stage.setExceptionHandler(threadableStage.exceptionListener); stage.setExceptionHandler(threadableStage.exceptionListener);
stage.setOwningThread(threadableStage.getOwningThread()); stage.setOwningThread(threadableStage.getOwningThread());
stage.setOwningContext(threadableStage.getOwningContext()); stage.setOwningContext(threadableStage.getOwningContext());
......
...@@ -26,7 +26,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> { ...@@ -26,7 +26,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
* this.getPipe().getTargetPort().getOwningStage() * this.getPipe().getTargetPort().getOwningStage()
* </pre> * </pre>
*/ */
protected final Stage cachedTargetStage; protected final AbstractStage cachedTargetStage;
private final OutputPort<? extends T> sourcePort; private final OutputPort<? extends T> sourcePort;
private final InputPort<T> targetPort; private final InputPort<T> targetPort;
......
...@@ -27,10 +27,10 @@ public abstract class AbstractPort<T> { ...@@ -27,10 +27,10 @@ public abstract class AbstractPort<T> {
* </p> * </p>
*/ */
private final Class<T> type; private final Class<T> type;
private final Stage owningStage; private final AbstractStage owningStage;
private final String name; private final String name;
protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) { protected AbstractPort(final Class<T> type, final AbstractStage owningStage, final String name) {
super(); super();
this.type = type; this.type = type;
this.owningStage = owningStage; this.owningStage = owningStage;
...@@ -41,7 +41,7 @@ public abstract class AbstractPort<T> { ...@@ -41,7 +41,7 @@ public abstract class AbstractPort<T> {
return this.type; return this.type;
} }
public Stage getOwningStage() { public AbstractStage getOwningStage() {
return owningStage; return owningStage;
} }
......
...@@ -31,13 +31,13 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -31,13 +31,13 @@ abstract class AbstractRunnableStage implements Runnable {
private final StopWatch stopWatch = new StopWatch(); private final StopWatch stopWatch = new StopWatch();
protected final Stage stage; protected final AbstractStage stage;
@SuppressWarnings("PMD.LoggerIsNotStaticFinal") @SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger; protected final Logger logger;
public static final Map<Stage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<Stage, Long>()); public static final Map<AbstractStage, Long> durationsInNs = Collections.synchronizedMap(new LinkedHashMap<AbstractStage, Long>());
protected AbstractRunnableStage(final Stage stage) { protected AbstractRunnableStage(final AbstractStage stage) {
if (stage == null) { if (stage == null) {
throw new IllegalArgumentException("Argument stage may not be null"); throw new IllegalArgumentException("Argument stage may not be null");
} }
...@@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -48,7 +48,7 @@ abstract class AbstractRunnableStage implements Runnable {
@Override @Override
public final void run() { public final void run() {
final Stage stage = this.stage; // should prevent the stage to be reloaded after a volatile read final AbstractStage stage = this.stage; // should prevent the stage to be reloaded after a volatile read
final Logger logger = this.logger; // should prevent the logger to be reloaded after a volatile read final Logger logger = this.logger; // should prevent the logger to be reloaded after a volatile read
logger.debug("Executing runnable stage..."); logger.debug("Executing runnable stage...");
...@@ -96,7 +96,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -96,7 +96,7 @@ abstract class AbstractRunnableStage implements Runnable {
protected abstract void afterStageExecution(); protected abstract void afterStageExecution();
static AbstractRunnableStage create(final Stage stage) { static AbstractRunnableStage create(final AbstractStage stage) {
if (stage.getInputPorts().size() > 0) { if (stage.getInputPorts().size() > 0) {
return new RunnableConsumerStage(stage); return new RunnableConsumerStage(stage);
} else { } else {
......
...@@ -20,14 +20,165 @@ import java.util.HashSet; ...@@ -20,14 +20,165 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
import teetime.util.framework.port.PortList; import teetime.util.framework.port.PortList;
import teetime.util.framework.port.PortRemovedListener; import teetime.util.framework.port.PortRemovedListener;
public abstract class AbstractStage extends Stage { /**
* Represents a minimal Stage, with some pre-defined methods.
* Implemented stages need to adapt all abstract methods with own implementations.
*/
@SuppressWarnings("PMD.AbstractNaming")
public abstract class AbstractStage {
private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>();
private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException();
private final String id;
/**
* A unique logger instance per stage instance
*/
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
protected AbstractExceptionListener exceptionListener;
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
private Thread owningThread;
private boolean isActive;
private ConfigurationContext owningContext;
ConfigurationContext getOwningContext() {
return owningContext;
}
void setOwningContext(final ConfigurationContext owningContext) {
this.owningContext = owningContext;
}
protected AbstractStage() {
this.id = this.createId();
this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id);
}
/**
* @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type.
*/
public String getId() {
return this.id;
}
@Override
public String toString() {
return this.getClass().getName() + ": " + this.getId();
}
private String createId() {
String simpleName = this.getClass().getSimpleName();
Integer numInstances = INSTANCES_COUNTER.get(simpleName);
if (null == numInstances) {
numInstances = 0;
}
String newId = simpleName + "-" + numInstances;
INSTANCES_COUNTER.put(simpleName, ++numInstances);
return newId;
}
@SuppressWarnings("PMD.DefaultPackage")
static void clearInstanceCounters() {
INSTANCES_COUNTER.clear();
}
// public abstract Stage getParentStage();
//
// public abstract void setParentStage(Stage parentStage, int index);
protected final void returnNoElement() {
throw NOT_ENOUGH_INPUT_EXCEPTION;
}
protected final void executeStage() {
try {
this.execute();
} catch (NotEnoughInputException e) {
throw e;
} catch (TerminateException e) {
throw e;
} catch (Exception e) {
final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw TerminateException.INSTANCE;
}
}
}
protected abstract void execute();
public Thread getOwningThread() {
return owningThread;
}
void setOwningThread(final Thread owningThread) {
if (this.owningThread != null && this.owningThread != owningThread) {
// checks also for "crossing threads"
// throw new IllegalStateException("Attribute owningThread was set twice each with another thread");
}
this.owningThread = owningThread;
}
// events
protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
this.exceptionListener = exceptionHandler;
}
public boolean isActive() {
return isActive;
}
void setActive(final boolean isActive) {
this.isActive = isActive;
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
public void declareActive() {
declareActive(getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
public void declareActive(final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(this);
Thread newThread = new TeeTimeThread(runnable, threadName);
this.setOwningThread(newThread);
this.setActive(true);
}
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>(); private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>(); private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
...@@ -36,17 +187,14 @@ public abstract class AbstractStage extends Stage { ...@@ -36,17 +187,14 @@ public abstract class AbstractStage extends Stage {
private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>(); private final PortList<OutputPort<?>> outputPorts = new PortList<OutputPort<?>>();
private volatile StageState currentState = StageState.CREATED; private volatile StageState currentState = StageState.CREATED;
@Override
protected List<InputPort<?>> getInputPorts() { protected List<InputPort<?>> getInputPorts() {
return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
} }
@Override
protected List<OutputPort<?>> getOutputPorts() { protected List<OutputPort<?>> getOutputPorts() {
return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
} }
@Override
public StageState getCurrentState() { public StageState getCurrentState() {
return currentState; return currentState;
} }
...@@ -55,7 +203,6 @@ public abstract class AbstractStage extends Stage { ...@@ -55,7 +203,6 @@ public abstract class AbstractStage extends Stage {
* May not be invoked outside of IPipe implementations * May not be invoked outside of IPipe implementations
*/ */
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
Class<? extends ISignal> signalClass = signal.getClass(); Class<? extends ISignal> signalClass = signal.getClass();
...@@ -113,20 +260,24 @@ public abstract class AbstractStage extends Stage { ...@@ -113,20 +260,24 @@ public abstract class AbstractStage extends Stage {
} }
} }
@Override
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections); this.validateOutputPorts(invalidPortConnections);
changeState(StageState.VALIDATED); changeState(StageState.VALIDATED);
} }
/**
* Event that is triggered within the initialization phase of the analysis.
* It does not count to the execution time.
*
* @throws Exception
* an arbitrary exception if an error occurs during the initialization
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
@Override
public void onStarting() throws Exception { public void onStarting() throws Exception {
changeState(StageState.STARTED); changeState(StageState.STARTED);
} }
@SuppressWarnings("PMD.SignatureDeclareThrowsException") @SuppressWarnings("PMD.SignatureDeclareThrowsException")
@Override
public void onTerminating() throws Exception { public void onTerminating() throws Exception {
changeState(StageState.TERMINATED); changeState(StageState.TERMINATED);
} }
...@@ -255,8 +406,13 @@ public abstract class AbstractStage extends Stage { ...@@ -255,8 +406,13 @@ public abstract class AbstractStage extends Stage {
return outputPort; return outputPort;
} }
/**
* This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors.
*
* @param invalidPortConnections
* <i>(Passed as parameter for performance reasons)</i>
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) { for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) {
final IPipe<?> pipe = outputPort.getPipe(); final IPipe<?> pipe = outputPort.getPipe();
...@@ -270,23 +426,19 @@ public abstract class AbstractStage extends Stage { ...@@ -270,23 +426,19 @@ public abstract class AbstractStage extends Stage {
} }
} }
@Override
protected void terminate() { protected void terminate() {
changeState(StageState.TERMINATING); changeState(StageState.TERMINATING);
} }
@Override
protected void abort() { protected void abort() {
this.terminate(); this.terminate();
this.getOwningThread().interrupt(); this.getOwningThread().interrupt();
}; };
@Override
protected boolean shouldBeTerminated() { protected boolean shouldBeTerminated() {
return (getCurrentState() == StageState.TERMINATING); return (getCurrentState() == StageState.TERMINATING);
} }
@Override
protected TerminationStrategy getTerminationStrategy() { protected TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SIGNAL; return TerminationStrategy.BY_SIGNAL;
} }
...@@ -303,7 +455,6 @@ public abstract class AbstractStage extends Stage { ...@@ -303,7 +455,6 @@ public abstract class AbstractStage extends Stage {
// return inputPort; // return inputPort;
// } // }
@Override
protected void removeDynamicPort(final OutputPort<?> outputPort) { protected void removeDynamicPort(final OutputPort<?> outputPort) {
outputPorts.remove(outputPort); // TODO update setIndex IF it is still used outputPorts.remove(outputPort); // TODO update setIndex IF it is still used
} }
...@@ -312,7 +463,6 @@ public abstract class AbstractStage extends Stage { ...@@ -312,7 +463,6 @@ public abstract class AbstractStage extends Stage {
outputPorts.addPortRemovedListener(outputPortRemovedListener); outputPorts.addPortRemovedListener(outputPortRemovedListener);
} }
@Override
protected void removeDynamicPort(final InputPort<?> inputPort) { protected void removeDynamicPort(final InputPort<?> inputPort) {
inputPorts.remove(inputPort); // TODO update setIndex IF it is still used inputPorts.remove(inputPort); // TODO update setIndex IF it is still used
} }
......
...@@ -33,7 +33,7 @@ public abstract class Configuration extends AbstractCompositeStage { ...@@ -33,7 +33,7 @@ public abstract class Configuration extends AbstractCompositeStage {
private boolean initialized; private boolean initialized;
private boolean executed; private boolean executed;
private Stage startStage; private AbstractStage startStage;
protected Configuration() { protected Configuration() {
this(new TerminatingExceptionListenerFactory()); this(new TerminatingExceptionListenerFactory());
...@@ -84,7 +84,7 @@ public abstract class Configuration extends AbstractCompositeStage { ...@@ -84,7 +84,7 @@ public abstract class Configuration extends AbstractCompositeStage {
return context; return context;
} }
Stage getStartStage() { AbstractStage getStartStage() {
return startStage; return startStage;
} }
......
...@@ -19,7 +19,7 @@ import java.util.Set; ...@@ -19,7 +19,7 @@ import java.util.Set;
/** /**
* Represents a context that is used by a configuration and composite stages to connect ports, for example. * Represents a context that is used by a configuration and composite stages to connect ports, for example.
* Stages can be added by executing {@link #declareActive(Stage)}. * Stages can be added by executing {@link #declareActive(AbstractStage)}.
* *
* @since 2.0 * @since 2.0
*/ */
...@@ -37,7 +37,7 @@ final class ConfigurationContext { ...@@ -37,7 +37,7 @@ final class ConfigurationContext {
this.threadService = new ThreadService(configuration); this.threadService = new ThreadService(configuration);
} }
Set<Stage> getThreadableStages() { Set<AbstractStage> getThreadableStages() {
return threadService.getThreadableStages(); return threadService.getThreadableStages();
} }
......
...@@ -76,8 +76,8 @@ public final class Execution<T extends Configuration> { ...@@ -76,8 +76,8 @@ public final class Execution<T extends Configuration> {
// BETTER validate concurrently // BETTER validate concurrently
private void validateStages() { private void validateStages() {
final Set<Stage> threadableStages = configurationContext.getThreadableStages(); final Set<AbstractStage> threadableStages = configurationContext.getThreadableStages();
for (Stage stage : threadableStages) { for (AbstractStage stage : threadableStages) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
// } // }
......
...@@ -40,9 +40,9 @@ class ExecutionInstantiation { ...@@ -40,9 +40,9 @@ class ExecutionInstantiation {
void instantiatePipes() { void instantiatePipes() {
int color = DEFAULT_COLOR; int color = DEFAULT_COLOR;
Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Map<AbstractStage, Integer> colors = new HashMap<AbstractStage, Integer>();
Set<Stage> threadableStages = context.getThreadableStages(); Set<AbstractStage> threadableStages = context.getThreadableStages();
for (Stage threadableStage : threadableStages) { for (AbstractStage threadableStage : threadableStages) {
color++; color++;
colors.put(threadableStage, color); colors.put(threadableStage, color);
...@@ -53,18 +53,18 @@ class ExecutionInstantiation { ...@@ -53,18 +53,18 @@ class ExecutionInstantiation {
private static class ThreadPainter { private static class ThreadPainter {
private final Map<Stage, Integer> colors; private final Map<AbstractStage, Integer> colors;
private final int color; private final int color;
private final Set<Stage> threadableStages; private final Set<AbstractStage> threadableStages;
public ThreadPainter(final Map<Stage, Integer> colors, final int color, final Set<Stage> threadableStages) { public ThreadPainter(final Map<AbstractStage, Integer> colors, final int color, final Set<AbstractStage> threadableStages) {
super(); super();
this.colors = colors; this.colors = colors;
this.color = color; this.color = color;
this.threadableStages = threadableStages; this.threadableStages = threadableStages;
} }
public int colorAndConnectStages(final Stage stage) { public int colorAndConnectStages(final AbstractStage stage) {
int createdConnections = 0; int createdConnections = 0;
for (OutputPort<?> outputPort : stage.getOutputPorts()) { for (OutputPort<?> outputPort : stage.getOutputPorts()) {
...@@ -82,7 +82,7 @@ class ExecutionInstantiation { ...@@ -82,7 +82,7 @@ class ExecutionInstantiation {
private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) { private int processPipe(final OutputPort outputPort, final InstantiationPipe pipe) {
int numCreatedConnections; int numCreatedConnections;
Stage targetStage = pipe.getTargetPort().getOwningStage(); AbstractStage targetStage = pipe.getTargetPort().getOwningStage();
int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR; int targetColor = colors.containsKey(targetStage) ? colors.get(targetStage) : DEFAULT_COLOR;
if (threadableStages.contains(targetStage) && targetColor != color) { if (threadableStages.contains(targetStage) && targetColor != color) {
......
...@@ -20,7 +20,7 @@ import teetime.framework.pipe.DummyPipe; ...@@ -20,7 +20,7 @@ import teetime.framework.pipe.DummyPipe;
public interface ITraverserVisitor { public interface ITraverserVisitor {
VisitorBehavior visit(Stage stage); VisitorBehavior visit(AbstractStage stage);
VisitorBehavior visit(AbstractPort<?> port); VisitorBehavior visit(AbstractPort<?> port);
......
...@@ -26,7 +26,7 @@ package teetime.framework; ...@@ -26,7 +26,7 @@ package teetime.framework;
*/ */
public final class InputPort<T> extends AbstractPort<T> { public final class InputPort<T> extends AbstractPort<T> {
InputPort(final Class<T> type, final Stage owningStage, final String portName) { InputPort(final Class<T> type, final AbstractStage owningStage, final String portName) {
super(type, owningStage, portName); super(type, owningStage, portName);
} }
......
...@@ -20,15 +20,15 @@ import teetime.framework.pipe.DummyPipe; ...@@ -20,15 +20,15 @@ import teetime.framework.pipe.DummyPipe;
public class IntraStageCollector implements ITraverserVisitor { public class IntraStageCollector implements ITraverserVisitor {
private final Stage startStage; private final AbstractStage startStage;
public IntraStageCollector(final Stage startStage) { public IntraStageCollector(final AbstractStage startStage) {
super(); super();
this.startStage = startStage; this.startStage = startStage;
} }
@Override @Override
public VisitorBehavior visit(final Stage stage) { public VisitorBehavior visit(final AbstractStage stage) {
if (stage == startStage || stage.getOwningThread() == null /* before execution */ if (stage == startStage || stage.getOwningThread() == null /* before execution */
|| stage.getOwningThread() == startStage.getOwningThread() /* while execution */) { || stage.getOwningThread() == startStage.getOwningThread() /* while execution */) {
return VisitorBehavior.CONTINUE; return VisitorBehavior.CONTINUE;
......
...@@ -30,7 +30,7 @@ import teetime.framework.signal.TerminatingSignal; ...@@ -30,7 +30,7 @@ import teetime.framework.signal.TerminatingSignal;
*/ */
public final class OutputPort<T> extends AbstractPort<T> { public final class OutputPort<T> extends AbstractPort<T> {
OutputPort(final Class<T> type, final Stage owningStage, final String portName) { OutputPort(final Class<T> type, final AbstractStage owningStage, final String portName) {
super(type, owningStage, portName); super(type, owningStage, portName);
setPipe(DummyPipe.INSTANCE); setPipe(DummyPipe.INSTANCE);
} }
......
...@@ -26,7 +26,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -26,7 +26,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
* @param stage * @param stage
* to execute within an own thread * to execute within an own thread
*/ */
public RunnableConsumerStage(final Stage stage) { public RunnableConsumerStage(final AbstractStage stage) {
super(stage); super(stage);
} }
...@@ -47,7 +47,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -47,7 +47,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
} }
} }
private void checkForTerminationSignal(final Stage stage) { private void checkForTerminationSignal(final AbstractStage stage) {
// FIXME should getInputPorts() really be defined in Stage? // FIXME should getInputPorts() really be defined in Stage?
for (InputPort<?> inputPort : stage.getInputPorts()) { for (InputPort<?> inputPort : stage.getInputPorts()) {
if (inputPort.isClosed()) { if (inputPort.isClosed()) {
......
...@@ -24,7 +24,7 @@ public class RunnableProducerStage extends AbstractRunnableStage { ...@@ -24,7 +24,7 @@ public class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0); private final Semaphore startSemaphore = new Semaphore(0);
public RunnableProducerStage(final Stage stage) { public RunnableProducerStage(final AbstractStage stage) {
super(stage); super(stage);
} }
......
...@@ -23,7 +23,7 @@ public final class RuntimeServiceFacade { ...@@ -23,7 +23,7 @@ public final class RuntimeServiceFacade {
// singleton // singleton
} }
public void startWithinNewThread(final Stage previousStage, final Stage stage) { public void startWithinNewThread(final AbstractStage previousStage, final AbstractStage stage) {
previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage); previousStage.getOwningContext().getThreadService().startStageAtRuntime(stage);
} }
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.AbstractExceptionListener.FurtherExecution;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
/**
* Represents a minimal Stage, with some pre-defined methods.
* Implemented stages need to adapt all abstract methods with own implementations.
*/
@SuppressWarnings("PMD.AbstractNaming")
public abstract class Stage {
private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>();
private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException();
private final String id;
/**
* A unique logger instance per stage instance
*/
@SuppressWarnings("PMD.LoggerIsNotStaticFinal")
protected final Logger logger;
protected AbstractExceptionListener exceptionListener;
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
private Thread owningThread;
private boolean isActive;
private ConfigurationContext owningContext;
ConfigurationContext getOwningContext() {
return owningContext;
}
void setOwningContext(final ConfigurationContext owningContext) {
this.owningContext = owningContext;
}
protected Stage() {
this.id = this.createId();
this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id);
}
/**
* @return an identifier that is unique among all stage instances. It is especially unique among all instances of the same stage type.
*/
public String getId() {
return this.id;
}
@Override
public String toString() {
return this.getClass().getName() + ": " + this.getId();
}
private String createId() {
String simpleName = this.getClass().getSimpleName();
Integer numInstances = INSTANCES_COUNTER.get(simpleName);
if (null == numInstances) {
numInstances = 0;
}
String newId = simpleName + "-" + numInstances;
INSTANCES_COUNTER.put(simpleName, ++numInstances);
return newId;
}
@SuppressWarnings("PMD.DefaultPackage")
static void clearInstanceCounters() {
INSTANCES_COUNTER.clear();
}
// public abstract Stage getParentStage();
//
// public abstract void setParentStage(Stage parentStage, int index);
protected final void returnNoElement() {
throw NOT_ENOUGH_INPUT_EXCEPTION;
}
/**
* This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors.
*
* @param invalidPortConnections
* <i>(Passed as parameter for performance reasons)</i>
*/
public abstract void validateOutputPorts(List<InvalidPortConnection> invalidPortConnections);
protected final void executeStage() {
try {
this.execute();
} catch (NotEnoughInputException e) {
throw e;
} catch (TerminateException e) {
throw e;
} catch (Exception e) {
final FurtherExecution furtherExecution = this.exceptionListener.reportException(e, this);
if (furtherExecution == FurtherExecution.TERMINATE) {
throw TerminateException.INSTANCE;
}
}
}
protected abstract void execute();
protected abstract void onSignal(ISignal signal, InputPort<?> inputPort);
protected abstract TerminationStrategy getTerminationStrategy();
protected abstract void terminate();
protected abstract void abort();
protected abstract boolean shouldBeTerminated();
public abstract StageState getCurrentState();
public Thread getOwningThread() {
return owningThread;
}
void setOwningThread(final Thread owningThread) {
if (this.owningThread != null && this.owningThread != owningThread) {
// checks also for "crossing threads"
// throw new IllegalStateException("Attribute owningThread was set twice each with another thread");
}
this.owningThread = owningThread;
}
protected abstract List<InputPort<?>> getInputPorts();
protected abstract List<OutputPort<?>> getOutputPorts();
// events
public abstract void onValidating(List<InvalidPortConnection> invalidPortConnections);
/**
* Event that is triggered within the initialization phase of the analysis.
* It does not count to the execution time.
*
* @throws Exception
* an arbitrary exception if an error occurs during the initialization
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onStarting() throws Exception;
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public abstract void onTerminating() throws Exception;
protected final void setExceptionHandler(final AbstractExceptionListener exceptionHandler) {
this.exceptionListener = exceptionHandler;
}
protected abstract void removeDynamicPort(OutputPort<?> outputPort);
protected abstract void removeDynamicPort(InputPort<?> inputPort);
public boolean isActive() {
return isActive;
}
void setActive(final boolean isActive) {
this.isActive = isActive;
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
public void declareActive() {
declareActive(getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
public void declareActive(final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(this);
Thread newThread = new TeeTimeThread(runnable, threadName);
this.setOwningThread(newThread);
this.setActive(true);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment