Skip to content
Snippets Groups Projects
Commit 0a5a0477 authored by Sören Henning's avatar Sören Henning
Browse files

Repair keyed flatMap

parent ebed16c7
No related branches found
No related tags found
No related merge requests found
Pipeline #10472 passed
......@@ -143,7 +143,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
.name("[Filter] SensorRegistry changed")
.map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry")
// .keyBy(sr -> 1)
.keyBy(sr -> 1) // The following flatMap is stateful so we need a key
.flatMap(new ChildParentsFlatMapFunction())
.name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment