diff --git a/src/main/java/teetime/stage/InputPortSizePrinter.java b/src/main/java/teetime/stage/InputPortSizePrinter.java
new file mode 100644
index 0000000000000000000000000000000000000000..cfcac15e0eaf8a5394ce23e0fc91af21f1063ed6
--- /dev/null
+++ b/src/main/java/teetime/stage/InputPortSizePrinter.java
@@ -0,0 +1,40 @@
+package teetime.stage;
+
+import java.util.concurrent.TimeUnit;
+
+import teetime.framework.AbstractConsumerStage;
+import teetime.framework.OutputPort;
+import teetime.framework.pipe.IPipe;
+import teetime.util.StopWatch;
+
+public class InputPortSizePrinter<T> extends AbstractConsumerStage<T> {
+
+ private final OutputPort<T> outputPort = createOutputPort();
+ private final StopWatch stopWatch;
+
+ private final long thresholdInNs = TimeUnit.SECONDS.toNanos(1);
+
+ public InputPortSizePrinter() {
+ stopWatch = new StopWatch();
+ stopWatch.start();
+ }
+
+ @Override
+ protected void execute(final T element) {
+ stopWatch.end();
+ if (stopWatch.getDurationInNs() >= thresholdInNs) {
+ if (logger.isDebugEnabled()) {
+ final IPipe pipe = inputPort.getPipe();
+ logger.debug("pipe size: " + pipe.size());
+ }
+ stopWatch.start();
+ }
+
+ outputPort.send(element);
+ }
+
+ public OutputPort<T> getOutputPort() {
+ return outputPort;
+ }
+
+}
diff --git a/src/main/java/teetime/stage/string/Tokenizer.java b/src/main/java/teetime/stage/string/Tokenizer.java
index 0a9254020f9e0b2769e051d8309325c8879d3025..7e451b5b55e321e503f97355fc68622294870bcf 100644
--- a/src/main/java/teetime/stage/string/Tokenizer.java
+++ b/src/main/java/teetime/stage/string/Tokenizer.java
@@ -15,35 +15,20 @@
*/
package teetime.stage.string;
-import java.util.concurrent.TimeUnit;
-
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
-import teetime.framework.pipe.IPipe;
-import teetime.util.StopWatch;
public final class Tokenizer extends AbstractConsumerStage<String> {
private final OutputPort<String> outputPort = this.createOutputPort();
private final String regex;
- private final StopWatch stopWatch;
public Tokenizer(final String regex) {
this.regex = regex;
- stopWatch = new StopWatch();
- stopWatch.start();
}
@Override
protected void execute(final String element) {
-
- stopWatch.end();
- if (stopWatch.getDurationInNs() >= TimeUnit.SECONDS.toNanos(1)) {
- IPipe pipe = inputPort.getPipe();
- logger.debug("pipe size: " + pipe.size());
- stopWatch.start();
- }
-
String[] tokens = element.split(regex);
for (String token : tokens) {
outputPort.send(token);