From 1bbbf38e05c563222e72d48a145596167b70a679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 26 Nov 2022 19:38:18 +0100 Subject: [PATCH] Use process time trigger in Flink as in Beam/Jet --- .../benchmarks/uc3/flink/HistoryServiceFlinkJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index 9a85a87a8..aaaba5b3b 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; @@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { return keyFactory.createKey(record.getIdentifier(), dateTime); }) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) - .trigger(ContinuousEventTimeTrigger.of(triggerDuration)) + .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration)) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .map(tuple -> { final String sensorId = keyFactory.getSensorId(tuple.f0); -- GitLab