Skip to content
Snippets Groups Projects
Commit c6d65e0a authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed EveryXthPrinter

parent 560d5cce
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package teetime.stage.io; ...@@ -3,6 +3,7 @@ package teetime.stage.io;
import java.util.List; import java.util.List;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage; import teetime.framework.Stage;
import teetime.framework.TerminationStrategy; import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
...@@ -12,48 +13,58 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; ...@@ -12,48 +13,58 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.EveryXthStage; import teetime.stage.EveryXthStage;
import teetime.stage.basic.distributor.Distributor;
public final class EveryXthPrinter<T> extends Stage { public final class EveryXthPrinter<T> extends Stage {
private final EveryXthStage<T> everyXthStage; private final Distributor<T> distributor;
private final Printer<T> printer;
public EveryXthPrinter(final int threshold) { public EveryXthPrinter(final int threshold) {
everyXthStage = new EveryXthStage<T>(threshold); distributor = new Distributor<T>();
printer = new Printer<T>(); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<T> printer = new Printer<T>();
IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
} }
@Override @Override
protected void executeWithPorts() { protected void executeWithPorts() {
everyXthStage.executeWithPorts(); distributor.executeWithPorts();
} }
@Override @Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
printer.validateOutputPorts(invalidPortConnections); distributor.validateOutputPorts(invalidPortConnections);
} }
@Override @Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
everyXthStage.onSignal(signal, inputPort); distributor.onSignal(signal, inputPort);
} }
@Override @Override
protected TerminationStrategy getTerminationStrategy() { protected TerminationStrategy getTerminationStrategy() {
return everyXthStage.getTerminationStrategy(); return distributor.getTerminationStrategy();
} }
@Override @Override
protected void terminate() { protected void terminate() {
everyXthStage.terminate(); distributor.terminate();
} }
@Override @Override
protected boolean shouldBeTerminated() { protected boolean shouldBeTerminated() {
return everyXthStage.shouldBeTerminated(); return distributor.shouldBeTerminated();
}
public InputPort<T> getInputPort() {
return distributor.getInputPort();
}
public OutputPort<T> getNewOutputPort() {
return distributor.getNewOutputPort();
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment