diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index 9a85a87a85222d5ee17f6f33bc7fa8425d5f1a03..aaaba5b3b4d86ebcdb205cdef16285b6ee47ae4c 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; @@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { return keyFactory.createKey(record.getIdentifier(), dateTime); }) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) - .trigger(ContinuousEventTimeTrigger.of(triggerDuration)) + .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration)) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .map(tuple -> { final String sensorId = keyFactory.getSensorId(tuple.f0);