Skip to content
Snippets Groups Projects
Commit 1bbbf38e authored by Sören Henning's avatar Sören Henning
Browse files

Use process time trigger in Flink as in Beam/Jet

parent 89206227
No related branches found
No related tags found
1 merge request!308Unify logging among all streaming engines
Pipeline #10188 canceled
...@@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
...@@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime); return keyFactory.createKey(record.getIdentifier(), dateTime);
}) })
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration)) .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> { .map(tuple -> {
final String sensorId = keyFactory.getSensorId(tuple.f0); final String sensorId = keyFactory.getSensorId(tuple.f0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment