From ea2b36c685dad63efc69a6dfce941c1bf525d3d9 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Thu, 11 Nov 2021 15:28:58 +0100 Subject: [PATCH] Add ussage of common Kafka-Reader in uc1-beam-flink --- .../beam/kafka/KafkaAggregatedPowerRecordReader.java | 4 +++- .../src/main/java/application/Uc1ApplicationBeam.java | 10 ++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java index 7b0913810..5976838d9 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java @@ -2,6 +2,8 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Map; +import java.util.Properties; + import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -25,7 +27,7 @@ public class KafkaAggregatedPowerRecordReader extends */ @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, - Map<Object, Object> consumerConfig) { + Properties consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java index 4cf6bf616..b4e1a4cef 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.ConfigurationKeys; +import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader; import titan.ccp.model.records.ActivePowerRecord; /** @@ -68,14 +69,7 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(uc1.bootstrapServer) - .withTopic(uc1.inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - AvroCoder.of(ActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) - .withoutMetadata(); + new KafkaAggregatedPowerRecordReader(uc1.bootstrapServer, uc1.inputTopic, consumerConfig); final LogKeyValue logKeyValue = new LogKeyValue(); -- GitLab