diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 5bb58812432667c97c13f7e1e8f5df8eb0411533..bc6a244aa1f329b878b5dfe68e4dc40fb7690804 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -32,10 +32,16 @@ public abstract class AbstractStage extends Stage { } } + /** + * @return the stage's input ports + */ protected InputPort<?>[] getInputPorts() { return this.cachedInputPorts; } + /** + * @return the stage's output ports + */ protected OutputPort<?>[] getOutputPorts() { return this.cachedOutputPorts; } @@ -45,7 +51,7 @@ public abstract class AbstractStage extends Stage { */ @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - if (!this.alreadyVisited(signal, inputPort)) { + if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); for (OutputPort<?> outputPort : this.outputPortList) { @@ -54,7 +60,14 @@ public abstract class AbstractStage extends Stage { } } - protected boolean alreadyVisited(final ISignal signal, final InputPort<?> inputPort) { + /** + * @param signal + * arriving signal + * @param inputPort + * which received the signal + * @return <code>true</code> if this stage has already received the given <code>signal</code>, <code>false</code> otherwise + */ + protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) { if (this.triggeredSignals.contains(signal)) { this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort); return true; @@ -80,6 +93,11 @@ public abstract class AbstractStage extends Stage { this.terminate(); } + /** + * Creates and adds an InputPort to the stage + * + * @return Newly added InputPort + */ protected <T> InputPort<T> createInputPort() { final InputPort<T> inputPort = new InputPort<T>(this); // inputPort.setType(portType); @@ -87,6 +105,11 @@ public abstract class AbstractStage extends Stage { return inputPort; } + /** + * Creates and adds an OutputPort to the stage + * + * @return Newly added OutputPort + */ protected <T> OutputPort<T> createOutputPort() { final OutputPort<T> outputPort = new OutputPort<T>(); // outputPort.setType(portType); diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index e83d9a31e27048589be691f1bb86ee630ae2928a..5319ef584a478c6514e807c6723b45a7e64d7aa5 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -11,6 +11,13 @@ import org.slf4j.LoggerFactory; import teetime.util.Pair; +/** + * Represents an Analysis to which stages can be added and executed later. + * This needs a {@link AnalysisConfiguration}, + * in which the adding and configuring of stages takes place. + * To start the analysis {@link #init()} and {@link #start()} need to be executed in this order. + * This class will automatically create threads and join them without any further commitment. + */ public class Analysis implements UncaughtExceptionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class); @@ -23,11 +30,17 @@ public class Analysis implements UncaughtExceptionHandler { private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); + private boolean initExecuted = false; + public Analysis(final AnalysisConfiguration configuration) { this.configuration = configuration; } + /** + * This initializes Analysis and needs to be run right before starting it. + */ public void init() { + initExecuted = true; final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { final Thread thread = new Thread(new RunnableStage(stage)); @@ -49,10 +62,14 @@ public class Analysis implements UncaughtExceptionHandler { } /** + * This method will start the Analysis and all containing stages. * * @return a collection of thread/throwable pairs */ public Collection<Pair<Thread, Throwable>> start() { + if (!initExecuted) { + LOGGER.error("init() not executed before starting the analysis"); + } // start analysis startThreads(this.consumerThreads); startThreads(this.finiteProducerThreads); @@ -93,6 +110,11 @@ public class Analysis implements UncaughtExceptionHandler { } } + /** + * Retrieves the Configuration which was used to add and arrange all stages needed for the Analysis + * + * @return Configuration used for the Analysis + */ public AnalysisConfiguration getConfiguration() { return this.configuration; } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 35a11e971ff92bca8db2d49dbe62f3335393b83d..5c699b53089e07350e4a63a2dcfc7c34507c6b25 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -5,6 +5,10 @@ import java.util.List; import teetime.framework.pipe.PipeFactoryRegistry; +/** + * Represents a configuration of connected stages, which is needed to run a analysis. + * Stages can be added by executing {@link #addThreadableStage(Stage)}. + */ public class AnalysisConfiguration { protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; @@ -16,6 +20,12 @@ public class AnalysisConfiguration { return this.threadableStageJobs; } + /** + * Execute this method, to add a stage to the configuration, which should be executed in a own thread. + * + * @param stage + * A arbitrary stage, which will be added to the configuration und executed in a thread. + */ public void addThreadableStage(final Stage stage) { this.threadableStageJobs.add(stage); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 11cccdc9e7b4b63d8f7f3c7c150bf45d29cac422..a23ea275d0b64890187117d364e8394a2279c230 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -10,7 +10,11 @@ import org.slf4j.LoggerFactory; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; -public abstract class Stage { +/** + * Represents a minimal Stage, with some pre-defined methods. + * Implemented stages need to adapt all abstract methods with own implementations. + */ +public abstract class Stage { // NOPMD (should not start with "Abstract") private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); @@ -18,13 +22,18 @@ public abstract class Stage { /** * A unique logger instance per stage instance */ - protected final Logger logger; + protected final Logger logger; protected Stage() { this.id = this.createId(); this.logger = LoggerFactory.getLogger(this.id); } + /** + * Retrieves the identifier associated with the stage + * + * @return An id as String + */ public String getId() { return this.id; } @@ -47,7 +56,7 @@ public abstract class Stage { return newId; } - static void clearInstanceCounters() { + static void clearInstanceCounters() { INSTANCES_COUNTER.clear(); } @@ -56,6 +65,7 @@ public abstract class Stage { // public abstract void setParentStage(Stage parentStage, int index); /** + * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. * * @param invalidPortConnections * <i>(Passed as parameter for performance reasons)</i> diff --git a/src/main/java/teetime/framework/pipe/IPipe.java b/src/main/java/teetime/framework/pipe/IPipe.java index 8fee03e759208e92c3d7fef5d2c04414297ec804..daca75180dbed1f265455121319d8efe6230cafb 100644 --- a/src/main/java/teetime/framework/pipe/IPipe.java +++ b/src/main/java/teetime/framework/pipe/IPipe.java @@ -4,25 +4,69 @@ import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; +/** + * Represents a pipe that connects an output port with an input port. + */ public interface IPipe { + /** + * Adds an element to the Pipe. + * + * @param element + * Element which will be added + * @return <code>true</code> if the element could be added, false otherwise + */ boolean add(Object element); + /** + * Checks whether the pipe is empty or not. + * + * @return <code>true</code> if the pipe is empty, false otherwise. + */ boolean isEmpty(); + /** + * Retrieves the number of elements, the pipe is capable to carry at the same time. + * + * @return Number of elements + */ int size(); + /** + * Retrieves the last element of the pipe and deletes it. + * + * @return The last element in the pipe. + */ Object removeLast(); + /** + * Reads the pipe's last element, but does not delete it. + * + * @return The last element in the pipe. + */ Object readLast(); + /** + * Retrieves the receiving port. + * + * @return InputPort which is connected to the pipe. + */ InputPort<?> getTargetPort(); + /** + * A stage can pass on a signal by executing this method. The signal will be sent to the receiving stage. + * + * @param signal + * The signal which needs to be passed on. + */ void sendSignal(ISignal signal); @Deprecated <T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); + /** + * Stages report new elements with this method. + */ void reportNewElement(); }