diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index d4c88415f6c46646468586bc9e3877b3ab9c88fb..e0c2a2a0035704fd6331fb2c8d94022e5e686e8e 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -143,7 +143,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) .name("[Filter] SensorRegistry changed") .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry") - // .keyBy(sr -> 1) + .keyBy(sr -> 1) // The following flatMap is stateful so we need a key .flatMap(new ChildParentsFlatMapFunction()) .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])");