Skip to content
Snippets Groups Projects
Commit 5e4c7249 authored by Christian Wulf's avatar Christian Wulf
Browse files

integrated EveryXthPrinter in one example performance test

parent c6d65e0a
No related branches found
No related tags found
No related merge requests found
...@@ -3,9 +3,10 @@ package teetime.stage; ...@@ -3,9 +3,10 @@ package teetime.stage;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class EveryXthStage<T> extends AbstractConsumerStage<T> { public final class EveryXthStage<T> extends AbstractConsumerStage<T> {
private final OutputPort<Integer> outputPort = createOutputPort();
private final OutputPort<T> outputPort = createOutputPort();
private final int threshold; private final int threshold;
private int counter; private int counter;
...@@ -18,11 +19,11 @@ public class EveryXthStage<T> extends AbstractConsumerStage<T> { ...@@ -18,11 +19,11 @@ public class EveryXthStage<T> extends AbstractConsumerStage<T> {
protected void execute(final T element) { protected void execute(final T element) {
counter++; counter++;
if (counter % threshold == 0) { if (counter % threshold == 0) {
outputPort.send(element); outputPort.send(Integer.valueOf(counter));
} }
} }
public OutputPort<T> getOutputPort() { public OutputPort<Integer> getOutputPort() {
return outputPort; return outputPort;
} }
......
...@@ -13,6 +13,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; ...@@ -13,6 +13,7 @@ import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.EveryXthStage; import teetime.stage.EveryXthStage;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
public final class EveryXthPrinter<T> extends Stage { public final class EveryXthPrinter<T> extends Stage {
...@@ -22,11 +23,13 @@ public final class EveryXthPrinter<T> extends Stage { ...@@ -22,11 +23,13 @@ public final class EveryXthPrinter<T> extends Stage {
public EveryXthPrinter(final int threshold) { public EveryXthPrinter(final int threshold) {
distributor = new Distributor<T>(); distributor = new Distributor<T>();
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold); EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<T> printer = new Printer<T>(); Printer<Integer> printer = new Printer<Integer>();
IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort()); pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort()); pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
distributor.setStrategy(new CopyByReferenceStrategy<T>());
} }
@Override @Override
......
...@@ -19,9 +19,9 @@ import java.util.ArrayList; ...@@ -19,9 +19,9 @@ import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import teetime.framework.Stage;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry;
...@@ -35,6 +35,7 @@ import teetime.stage.Relay; ...@@ -35,6 +35,7 @@ import teetime.stage.Relay;
import teetime.stage.StartTimestampFilter; import teetime.stage.StartTimestampFilter;
import teetime.stage.StopTimestampFilter; import teetime.stage.StopTimestampFilter;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.EveryXthPrinter;
import teetime.util.ConstructorClosure; import teetime.util.ConstructorClosure;
import teetime.util.TimestampObject; import teetime.util.TimestampObject;
...@@ -143,6 +144,7 @@ public class MethodCallThroughputAnalysis17 { ...@@ -143,6 +144,7 @@ public class MethodCallThroughputAnalysis17 {
noopFilters[i] = new NoopFilter<TimestampObject>(); noopFilters[i] = new NoopFilter<TimestampObject>();
} }
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000);
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
...@@ -155,7 +157,8 @@ public class MethodCallThroughputAnalysis17 { ...@@ -155,7 +157,8 @@ public class MethodCallThroughputAnalysis17 {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
} }
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort());
UnorderedGrowablePipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort());
final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment