From 0b5b4a26453258626b53379f3df292cb3120d8a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 1 Feb 2022 17:57:48 +0100 Subject: [PATCH] Read configuration topic from beginning --- .../application/AggregationServiceFlinkJob.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 3e2878a89..e4367b892 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; @@ -103,9 +104,11 @@ public final class AggregationServiceFlinkJob { this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS new ImmutableSetSerializer()); - this.env.getConfig().getRegisteredTypesWithKryoSerializers() - .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " - + s.getSerializer().getClass().getName())); + this.env + .getConfig() + .getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class '{}' registered with serializer '{}'.", c.getName(), + s.getSerializer().getClass().getName())); } private void buildPipeline() { @@ -134,12 +137,13 @@ public final class AggregationServiceFlinkJob { final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); - final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = + final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource = kafkaConnector.createConsumer( configurationTopic, EventSerde::serde, Serdes::String, - TupleType.of(TypeInformation.of(Event.class), Types.STRING)); + TupleType.of(TypeInformation.of(Event.class), Types.STRING)) + .setStartFromEarliest(); // Sink to output topic with SensorId, AggregatedActivePowerRecord final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = -- GitLab