diff --git a/src/main/java/teetime/stage/EveryXthStage.java b/src/main/java/teetime/stage/EveryXthStage.java index 3ff9319df3e295bc30875652efbbd2dbb1a86a68..4f3ff730d9f88e2e8be32555fc0fe01189ac3c4c 100644 --- a/src/main/java/teetime/stage/EveryXthStage.java +++ b/src/main/java/teetime/stage/EveryXthStage.java @@ -3,9 +3,10 @@ package teetime.stage; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -public class EveryXthStage<T> extends AbstractConsumerStage<T> { +public final class EveryXthStage<T> extends AbstractConsumerStage<T> { + + private final OutputPort<Integer> outputPort = createOutputPort(); - private final OutputPort<T> outputPort = createOutputPort(); private final int threshold; private int counter; @@ -18,11 +19,11 @@ public class EveryXthStage<T> extends AbstractConsumerStage<T> { protected void execute(final T element) { counter++; if (counter % threshold == 0) { - outputPort.send(element); + outputPort.send(Integer.valueOf(counter)); } } - public OutputPort<T> getOutputPort() { + public OutputPort<Integer> getOutputPort() { return outputPort; } diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index e0108eba9ca8689b98ea7e60434e1a245c9c181a..1af922a1521d94f5db344d878bf751771c461aa1 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -13,6 +13,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; import teetime.stage.EveryXthStage; +import teetime.stage.basic.distributor.CopyByReferenceStrategy; import teetime.stage.basic.distributor.Distributor; public final class EveryXthPrinter<T> extends Stage { @@ -22,11 +23,13 @@ public final class EveryXthPrinter<T> extends Stage { public EveryXthPrinter(final int threshold) { distributor = new Distributor<T>(); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); - Printer<T> printer = new Printer<T>(); + Printer<Integer> printer = new Printer<Integer>(); IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); + + distributor.setStrategy(new CopyByReferenceStrategy<T>()); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java index c9d8f6d5a0319c08aaa3dc7477dff8326df4e709..60baf45d3935d88312bbc7ba788e351e9ba5981e 100644 --- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -19,9 +19,9 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; import teetime.framework.RunnableStage; +import teetime.framework.Stage; import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.IPipe; import teetime.framework.pipe.PipeFactoryRegistry; @@ -35,6 +35,7 @@ import teetime.stage.Relay; import teetime.stage.StartTimestampFilter; import teetime.stage.StopTimestampFilter; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.io.EveryXthPrinter; import teetime.util.ConstructorClosure; import teetime.util.TimestampObject; @@ -143,6 +144,7 @@ public class MethodCallThroughputAnalysis17 { noopFilters[i] = new NoopFilter<TimestampObject>(); } final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); @@ -155,7 +157,8 @@ public class MethodCallThroughputAnalysis17 { UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + UnorderedGrowablePipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay);