Skip to content
Snippets Groups Projects
Commit c2db6348 authored by Sören Henning's avatar Sören Henning
Browse files

Add feedback topic fo Flink's UC4

parent 09fd2eed
No related branches found
No related tags found
No related merge requests found
Pipeline #10443 failed
...@@ -62,13 +62,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -62,13 +62,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
} }
@Override @Override
protected void buildPipeline() { protected void buildPipeline() { // NOPMD
// Get configurations // Get configurations
final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = final String schemaRegistryUrl =
this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL); this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
final Time windowSize = final Time windowSize =
Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
final Duration windowGrace = final Duration windowGrace =
...@@ -87,9 +88,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -87,9 +88,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
// TODO Watermarks? // TODO Watermarks?
// Source from output topic with AggregatedPowerRecords // Source from feedback topic with AggregatedPowerRecords
final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); kafkaConnector.createConsumer(feedbackTopic, AggregatedActivePowerRecord.class);
final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource = final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource =
kafkaConnector.createConsumer( kafkaConnector.createConsumer(
...@@ -107,6 +108,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -107,6 +108,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
// Sink to feedback topic with SensorId, AggregatedActivePowerRecord
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaFeedbackSink =
kafkaConnector.createProducer(
feedbackTopic,
Serdes::String,
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
// Build input stream // Build input stream
final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
...@@ -155,11 +164,13 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -155,11 +164,13 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
.name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord"); .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord");
// add Kafka Sink // add Kafka Sink
aggregationStream final DataStream<Tuple2<String, AggregatedActivePowerRecord>> results = aggregationStream
.map(value -> new Tuple2<>(value.getIdentifier(), value)) .map(value -> new Tuple2<>(value.getIdentifier(), value))
.name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
.returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
results.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); // NOCS
results.addSink(kafkaFeedbackSink).name("[Kafka Producer] Topic: " + feedbackTopic); // NOCS
} }
public static void main(final String[] args) { public static void main(final String[] args) {
......
...@@ -11,6 +11,8 @@ public final class Uc4ConfigurationKeys { ...@@ -11,6 +11,8 @@ public final class Uc4ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String EMIT_PERIOD_MS = "emit.period.ms"; public static final String EMIT_PERIOD_MS = "emit.period.ms";
......
application.name=theodolite-uc4-application application.name=theodolite-uc4-application
application.version=0.0.1 application.version=0.0.1
configuration.host=localhost
configuration.port=8082
configuration.kafka.topic=configuration configuration.kafka.topic=configuration
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
emit.period.ms=5000 emit.period.ms=5000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment