diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 3e2878a893057024de00333492462f5029eb6d77..e4367b892e10d1e5c712e37748f163c73ff41ae9 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; @@ -103,9 +104,11 @@ public final class AggregationServiceFlinkJob { this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS new ImmutableSetSerializer()); - this.env.getConfig().getRegisteredTypesWithKryoSerializers() - .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " - + s.getSerializer().getClass().getName())); + this.env + .getConfig() + .getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class '{}' registered with serializer '{}'.", c.getName(), + s.getSerializer().getClass().getName())); } private void buildPipeline() { @@ -134,12 +137,13 @@ public final class AggregationServiceFlinkJob { final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); - final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = + final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource = kafkaConnector.createConsumer( configurationTopic, EventSerde::serde, Serdes::String, - TupleType.of(TypeInformation.of(Event.class), Types.STRING)); + TupleType.of(TypeInformation.of(Event.class), Types.STRING)) + .setStartFromEarliest(); // Sink to output topic with SensorId, AggregatedActivePowerRecord final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =