Skip to content
Snippets Groups Projects
Commit 9a87091a authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into find-pipefact-conf

parents f203ae56 85616333
No related branches found
No related tags found
No related merge requests found
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8
package teetime.stage.basic;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
......@@ -10,18 +13,22 @@ public class Delay<T> extends AbstractStage {
private final InputPort<Long> timestampTriggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private final List<T> bufferedElements = new LinkedList<T>();
@Override
public void executeWithPorts() {
T element = inputPort.receive();
if (null != element) {
bufferedElements.add(element);
}
Long timestampTrigger = this.timestampTriggerInputPort.receive();
if (null == timestampTrigger) {
return;
}
// System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size());
// System.out.println("#elements: " + this.getInputPort().pipe.size());
// TODO implement receiveAll() and sendMultiple()
while (!this.inputPort.getPipe().isEmpty()) {
T element = this.inputPort.receive();
while (!bufferedElements.isEmpty()) {
element = bufferedElements.remove(0);
this.send(this.outputPort, element);
}
}
......
......@@ -30,7 +30,6 @@ import teetime.stage.util.TextLine;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class File2TextLinesFilter extends ConsumerStage<File> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment