Skip to content
Snippets Groups Projects
Commit 0b5b4a26 authored by Sören Henning's avatar Sören Henning
Browse files

Read configuration topic from beginning

parent 47ec6dbc
No related branches found
No related tags found
No related merge requests found
Pipeline #6381 passed
......@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
......@@ -103,9 +104,11 @@ public final class AggregationServiceFlinkJob {
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
this.env
.getConfig()
.getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class '{}' registered with serializer '{}'.", c.getName(),
s.getSerializer().getClass().getName()));
}
private void buildPipeline() {
......@@ -134,12 +137,13 @@ public final class AggregationServiceFlinkJob {
final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class);
final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource =
final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource =
kafkaConnector.createConsumer(
configurationTopic,
EventSerde::serde,
Serdes::String,
TupleType.of(TypeInformation.of(Event.class), Types.STRING));
TupleType.of(TypeInformation.of(Event.class), Types.STRING))
.setStartFromEarliest();
// Sink to output topic with SensorId, AggregatedActivePowerRecord
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment