From 7818d1891283aa22c305d0f448d19c28dd16d781 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 19 Mar 2015 14:43:35 +0100 Subject: [PATCH] added InputPortSizePrinter --- .../teetime/stage/InputPortSizePrinter.java | 40 +++++++++++++++++++ .../java/teetime/stage/string/Tokenizer.java | 15 ------- 2 files changed, 40 insertions(+), 15 deletions(-) create mode 100644 src/main/java/teetime/stage/InputPortSizePrinter.java diff --git a/src/main/java/teetime/stage/InputPortSizePrinter.java b/src/main/java/teetime/stage/InputPortSizePrinter.java new file mode 100644 index 00000000..cfcac15e --- /dev/null +++ b/src/main/java/teetime/stage/InputPortSizePrinter.java @@ -0,0 +1,40 @@ +package teetime.stage; + +import java.util.concurrent.TimeUnit; + +import teetime.framework.AbstractConsumerStage; +import teetime.framework.OutputPort; +import teetime.framework.pipe.IPipe; +import teetime.util.StopWatch; + +public class InputPortSizePrinter<T> extends AbstractConsumerStage<T> { + + private final OutputPort<T> outputPort = createOutputPort(); + private final StopWatch stopWatch; + + private final long thresholdInNs = TimeUnit.SECONDS.toNanos(1); + + public InputPortSizePrinter() { + stopWatch = new StopWatch(); + stopWatch.start(); + } + + @Override + protected void execute(final T element) { + stopWatch.end(); + if (stopWatch.getDurationInNs() >= thresholdInNs) { + if (logger.isDebugEnabled()) { + final IPipe pipe = inputPort.getPipe(); + logger.debug("pipe size: " + pipe.size()); + } + stopWatch.start(); + } + + outputPort.send(element); + } + + public OutputPort<T> getOutputPort() { + return outputPort; + } + +} diff --git a/src/main/java/teetime/stage/string/Tokenizer.java b/src/main/java/teetime/stage/string/Tokenizer.java index 0a925402..7e451b5b 100644 --- a/src/main/java/teetime/stage/string/Tokenizer.java +++ b/src/main/java/teetime/stage/string/Tokenizer.java @@ -15,35 +15,20 @@ */ package teetime.stage.string; -import java.util.concurrent.TimeUnit; - import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -import teetime.framework.pipe.IPipe; -import teetime.util.StopWatch; public final class Tokenizer extends AbstractConsumerStage<String> { private final OutputPort<String> outputPort = this.createOutputPort(); private final String regex; - private final StopWatch stopWatch; public Tokenizer(final String regex) { this.regex = regex; - stopWatch = new StopWatch(); - stopWatch.start(); } @Override protected void execute(final String element) { - - stopWatch.end(); - if (stopWatch.getDurationInNs() >= TimeUnit.SECONDS.toNanos(1)) { - IPipe pipe = inputPort.getPipe(); - logger.debug("pipe size: " + pipe.size()); - stopWatch.start(); - } - String[] tokens = element.split(regex); for (String token : tokens) { outputPort.send(token); -- GitLab