diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 7d424608215c19b6478f5a90fe3d71b0ad095519..d4c88415f6c46646468586bc9e3877b3ab9c88fb 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -62,13 +62,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { } @Override - protected void buildPipeline() { + protected void buildPipeline() { // NOPMD // Get configurations final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final Time windowSize = Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = @@ -87,9 +88,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); // TODO Watermarks? - // Source from output topic with AggregatedPowerRecords + // Source from feedback topic with AggregatedPowerRecords final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = - kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); + kafkaConnector.createConsumer(feedbackTopic, AggregatedActivePowerRecord.class); final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource = kafkaConnector.createConsumer( @@ -107,6 +108,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + // Sink to feedback topic with SensorId, AggregatedActivePowerRecord + final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaFeedbackSink = + kafkaConnector.createProducer( + feedbackTopic, + Serdes::String, + () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), + Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + // Build input stream final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS @@ -155,11 +164,13 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord"); // add Kafka Sink - aggregationStream + final DataStream<Tuple2<String, AggregatedActivePowerRecord>> results = aggregationStream .map(value -> new Tuple2<>(value.getIdentifier(), value)) .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") - .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) - .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); + .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + + results.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); // NOCS + results.addSink(kafkaFeedbackSink).name("[Kafka Producer] Topic: " + feedbackTopic); // NOCS } public static void main(final String[] args) { diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java index e910cd2078e8a18112fdd509272d99331082ae4f..49c38ae66184bf7c987e770bf08da80b5b02e5c1 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java @@ -11,6 +11,8 @@ public final class Uc4ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String EMIT_PERIOD_MS = "emit.period.ms"; diff --git a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties index c34c6047da3a515919e90680143db510b7c89c84..bb26b56fa72a896ca588376e9a98b8b0251e3ec5 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties @@ -1,13 +1,12 @@ application.name=theodolite-uc4-application application.version=0.0.1 -configuration.host=localhost -configuration.port=8082 configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 emit.period.ms=5000