diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 09903e8c447f54620a437de8b8fe6db964360cb8..6c0ecac5b2b93a86500ee939623e110e1a80f4e9 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -1,5 +1,8 @@ package teetime.stage.basic; +import java.util.LinkedList; +import java.util.List; + import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; @@ -10,18 +13,22 @@ public class Delay<T> extends AbstractStage { private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); private final OutputPort<T> outputPort = this.createOutputPort(); + private final List<T> bufferedElements = new LinkedList<T>(); + @Override public void executeWithPorts() { + T element = inputPort.receive(); + if (null != element) { + bufferedElements.add(element); + } + Long timestampTrigger = this.timestampTriggerInputPort.receive(); if (null == timestampTrigger) { return; } - // System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size()); - // System.out.println("#elements: " + this.getInputPort().pipe.size()); - // TODO implement receiveAll() and sendMultiple() - while (!this.inputPort.getPipe().isEmpty()) { - T element = this.inputPort.receive(); + while (!bufferedElements.isEmpty()) { + element = bufferedElements.remove(0); this.send(this.outputPort, element); } }