From c6d65e0aed0dd3dafcd15d906f700f22507fce27 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 5 Dec 2014 19:06:24 +0100 Subject: [PATCH] fixed EveryXthPrinter --- .../teetime/stage/io/EveryXthPrinter.java | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index 1b17337f..e0108eba 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -3,6 +3,7 @@ package teetime.stage.io; import java.util.List; import teetime.framework.InputPort; +import teetime.framework.OutputPort; import teetime.framework.Stage; import teetime.framework.TerminationStrategy; import teetime.framework.pipe.IPipeFactory; @@ -12,48 +13,58 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; import teetime.stage.EveryXthStage; +import teetime.stage.basic.distributor.Distributor; public final class EveryXthPrinter<T> extends Stage { - private final EveryXthStage<T> everyXthStage; - private final Printer<T> printer; + private final Distributor<T> distributor; public EveryXthPrinter(final int threshold) { - everyXthStage = new EveryXthStage<T>(threshold); - printer = new Printer<T>(); + distributor = new Distributor<T>(); + EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); + Printer<T> printer = new Printer<T>(); IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); } @Override protected void executeWithPorts() { - everyXthStage.executeWithPorts(); + distributor.executeWithPorts(); } @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - printer.validateOutputPorts(invalidPortConnections); + distributor.validateOutputPorts(invalidPortConnections); } @Override protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { - everyXthStage.onSignal(signal, inputPort); + distributor.onSignal(signal, inputPort); } @Override protected TerminationStrategy getTerminationStrategy() { - return everyXthStage.getTerminationStrategy(); + return distributor.getTerminationStrategy(); } @Override protected void terminate() { - everyXthStage.terminate(); + distributor.terminate(); } @Override protected boolean shouldBeTerminated() { - return everyXthStage.shouldBeTerminated(); + return distributor.shouldBeTerminated(); + } + + public InputPort<T> getInputPort() { + return distributor.getInputPort(); + } + + public OutputPort<T> getNewOutputPort() { + return distributor.getNewOutputPort(); } } -- GitLab