From f43b0eebb439f3ca21ccd7aeda457c232f7bc209 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 24 Oct 2014 11:46:46 +0200 Subject: [PATCH] fixed Delay filter --- src/main/java/teetime/stage/basic/Delay.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 09903e8c..6c0ecac5 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -1,5 +1,8 @@ package teetime.stage.basic; +import java.util.LinkedList; +import java.util.List; + import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -10,18 +13,22 @@ public class Delay<T> extends AbstractStage { private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); private final OutputPort<T> outputPort = this.createOutputPort(); + private final List<T> bufferedElements = new LinkedList<T>(); + @Override public void executeWithPorts() { + T element = inputPort.receive(); + if (null != element) { + bufferedElements.add(element); + } + Long timestampTrigger = this.timestampTriggerInputPort.receive(); if (null == timestampTrigger) { return; } - // System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size()); - // System.out.println("#elements: " + this.getInputPort().pipe.size()); - // TODO implement receiveAll() and sendMultiple() - while (!this.inputPort.getPipe().isEmpty()) { - T element = this.inputPort.receive(); + while (!bufferedElements.isEmpty()) { + element = bufferedElements.remove(0); this.send(this.outputPort, element); } } -- GitLab