Commit 1252d9c8 authored by Christian Wulf's avatar Christian Wulf

made CompositeProducerStage ready to be tested by our stage testing

framework
parent f7478aea
......@@ -16,7 +16,6 @@
package teetime.stage;
import teetime.framework.AbstractProducerStage;
import teetime.framework.TerminationStrategy;
import teetime.framework.termination.NextActiveStageShouldTerminate;
import teetime.framework.termination.TerminationCondition;
......@@ -71,12 +70,6 @@ public class Clock extends AbstractProducerStage<Long> {
this.terminationCondition = terminationCondition;
}
@Override
public TerminationStrategy getTerminationStrategy() {
// return TerminationStrategy.BY_INTERRUPT;
return TerminationStrategy.BY_SELF_DECISION;
}
@Override
protected void execute() {
if (this.initialDelayExceeded) {
......
......@@ -28,22 +28,28 @@ import teetime.stage.basic.merger.Merger;
*/
class CompositeProducerStage<T> extends CompositeStage {
private final Merger<T> merger;
private final InputPort<T> inputPort;
private final OutputPort<T> outputPort;
@SafeVarargs
public CompositeProducerStage(final T... elements) {
InitialElementProducer<T> producer = new InitialElementProducer<T>(elements);
merger = new Merger<T>();
Merger<T> merger = new Merger<T>();
connectPorts(producer.getOutputPort(), merger.getNewInputPort());
merger.declareActive();
// map outer ports to inner ports
inputPort = createInputPort(merger.getNewInputPort());
outputPort = createOutputPort(merger.getOutputPort());
}
InputPort<T> getInputPort() {
return merger.getNewInputPort();
return inputPort;
}
OutputPort<T> getOutputPort() {
return merger.getOutputPort();
return outputPort;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment