Skip to content
Snippets Groups Projects
Commit a12b86bc authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into find-pipefact-conf

parents b8ba63ae ac1f27a3
No related branches found
No related tags found
No related merge requests found
eclipse.preferences.version=1 eclipse.preferences.version=1
encoding//src/main/java=UTF-8 encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8 encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8 encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8 encoding/<project>=UTF-8
package teetime.stage.basic; package teetime.stage.basic;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.AbstractStage; import teetime.framework.AbstractStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
...@@ -10,18 +13,22 @@ public class Delay<T> extends AbstractStage { ...@@ -10,18 +13,22 @@ public class Delay<T> extends AbstractStage {
private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); private final InputPort<Long> timestampTriggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
private final List<T> bufferedElements = new LinkedList<T>();
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
T element = inputPort.receive();
if (null != element) {
bufferedElements.add(element);
}
Long timestampTrigger = this.timestampTriggerInputPort.receive(); Long timestampTrigger = this.timestampTriggerInputPort.receive();
if (null == timestampTrigger) { if (null == timestampTrigger) {
return; return;
} }
// System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size());
// System.out.println("#elements: " + this.getInputPort().pipe.size()); while (!bufferedElements.isEmpty()) {
// TODO implement receiveAll() and sendMultiple() element = bufferedElements.remove(0);
while (!this.inputPort.getPipe().isEmpty()) {
T element = this.inputPort.receive();
this.send(this.outputPort, element); this.send(this.outputPort, element);
} }
} }
......
...@@ -30,7 +30,6 @@ import teetime.stage.util.TextLine; ...@@ -30,7 +30,6 @@ import teetime.stage.util.TextLine;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10
*/ */
public class File2TextLinesFilter extends ConsumerStage<File> { public class File2TextLinesFilter extends ConsumerStage<File> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment