diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java index 7b09138107332994bf05842d9d26f96545ecba4a..5976838d9fbe2ce3b3bc7f57df1e96f9ef8820f9 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java @@ -2,6 +2,8 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Map; +import java.util.Properties; + import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -25,7 +27,7 @@ public class KafkaAggregatedPowerRecordReader extends */ @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, - Map<Object, Object> consumerConfig) { + Properties consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java index 4cf6bf6167e79cd99fadec46f2245895c23f78ce..b4e1a4cef0063dd29f3bf86ea54a5bba62bf2dbf 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.ConfigurationKeys; +import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader; import titan.ccp.model.records.ActivePowerRecord; /** @@ -68,14 +69,7 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - KafkaIO.<String, ActivePowerRecord>read() - .withBootstrapServers(uc1.bootstrapServer) - .withTopic(uc1.inputTopic) - .withKeyDeserializer(StringDeserializer.class) - .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, - AvroCoder.of(ActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) - .withoutMetadata(); + new KafkaAggregatedPowerRecordReader(uc1.bootstrapServer, uc1.inputTopic, consumerConfig); final LogKeyValue logKeyValue = new LogKeyValue();