... | ... | @@ -14,50 +14,16 @@ Moreover, you can access a default output port named `outputPort`. |
|
|
**Note:** You need to place a `terminate()` call within your `execute` method to signal the framework when your producer stage has finished its job.
|
|
|
Example producer stages are `Clock` and `InitialElementProducer`.
|
|
|
|
|
|
Inherit from `AbstractStage`, if you want to write a stage that should have neither a pre-initialized semantics nor any default input and output ports.
|
|
|
An example stage is the `Merger`.
|
|
|
<!-- Inherit from `AbstractStage`, if you want to write a stage that should have neither a pre-initialized semantics nor any default input and output ports.
|
|
|
An example stage is the `Merger`. -->
|
|
|
|
|
|
## What Code Shall I Write?
|
|
|
You only need to implement the logics your stage should represent in the `execute()` method.
|
|
|
If you need an input or an output port, declare it as class attribute by using the `createInputPort()` and `createOutputPort()` methods.
|
|
|
If your stage should have an internal state, follow these [instructions](writing-a-stage-with-a-state).
|
|
|
|
|
|
For example, consider the `InstanceOfFilter` below. It relays the element from its default input port (got from its super class `AbstractConsumerStage`) to its self-declared output port if the element is an instance of the class represented by `type`. The logics is implemented in the `execute()` method.
|
|
|
The following examples will give you a short insight on how stages can be implemented:
|
|
|
|
|
|
```java
|
|
|
public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> {
|
|
|
|
|
|
private final OutputPort<O> outputPort = this.createOutputPort();
|
|
|
|
|
|
private Class<O> type;
|
|
|
|
|
|
public InstanceOfFilter(final Class<O> type) {
|
|
|
this.type = type;
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
protected void execute(final I element) {
|
|
|
if (this.type.isInstance(element)) {
|
|
|
this.outputPort.send((O) element);
|
|
|
} else { // swallow up the element
|
|
|
if (this.logger.isDebugEnabled()) {
|
|
|
this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public Class<O> getType() {
|
|
|
return this.type;
|
|
|
}
|
|
|
|
|
|
public void setType(final Class<O> type) {
|
|
|
this.type = type;
|
|
|
}
|
|
|
|
|
|
public OutputPort<O> getOutputPort() {
|
|
|
return this.outputPort;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
``` |
|
|
\ No newline at end of file |
|
|
- [A simple producer stage](Producer-Stage)
|
|
|
- [A simple consumer stage](Consumer-Stage)
|
|
|
- [A consumer with multiple output ports](Multiple-OutputPorts) |