From 7c7d06342070302c1d663971c477f41e309a7bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Mon, 31 Jan 2022 14:53:58 +0100 Subject: [PATCH] Fix code quality issues --- .../kafka/ActivePowerRecordDeserializer.java | 4 + .../kafka/TypedKafkaAvroDeserializer.java | 84 +++++++++++-------- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java index 2ec0f464d..15cd024fe 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java @@ -1,6 +1,10 @@ package theodolite.commons.beam.kafka; +import org.apache.kafka.common.serialization.Deserializer; import titan.ccp.model.records.ActivePowerRecord; +/** + * A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}. + */ public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> { } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java index b213aa0bc..c08e8b8ae 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java @@ -6,44 +6,54 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.kafka.common.serialization.Deserializer; +/** + * A Kafka {@link Deserializer} for the Confluent Schema Registry, similar to + * {@link KafkaAvroDeserializer} but for typed records. + * + * @param <T> Type to be deserialized into. + */ public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> { - private final KafkaAvroDeserializer deserializer; - - public TypedKafkaAvroDeserializer() { - this.deserializer = new KafkaAvroDeserializer(); - } - - public TypedKafkaAvroDeserializer(SchemaRegistryClient client) { - this.deserializer = new KafkaAvroDeserializer(client); - } - - public TypedKafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) { - this.deserializer = new KafkaAvroDeserializer(client, props); - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - - } - - @SuppressWarnings("unchecked") - @Override - public T deserialize(String s, byte[] bytes) { - return (T) this.deserializer.deserialize(s, bytes); - } - - /** - * Pass a reader schema to get an Avro projection - */ - @SuppressWarnings("unchecked") - public T deserialize(String topic, byte[] bytes, Schema readerSchema) { - return (T) this.deserializer.deserialize(topic, bytes, readerSchema); - } - - @Override - public void close() { - this.deserializer.close(); - } + private final KafkaAvroDeserializer deserializer; + + public TypedKafkaAvroDeserializer() { + this.deserializer = new KafkaAvroDeserializer(); + } + + public TypedKafkaAvroDeserializer(final SchemaRegistryClient client) { + this.deserializer = new KafkaAvroDeserializer(client); + } + + public TypedKafkaAvroDeserializer(final SchemaRegistryClient client, final Map<String, ?> props) { + this.deserializer = new KafkaAvroDeserializer(client, props); + } + + public TypedKafkaAvroDeserializer(final KafkaAvroDeserializer kafkaAvroDeserializer) { + this.deserializer = kafkaAvroDeserializer; + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + this.deserializer.configure(configs, isKey); + } + + @SuppressWarnings("unchecked") + @Override + public T deserialize(final String s, final byte[] bytes) { + return (T) this.deserializer.deserialize(s, bytes); + } + + /** + * Pass a reader schema to get an Avro projection. + */ + @SuppressWarnings("unchecked") + public T deserialize(final String topic, final byte[] bytes, final Schema readerSchema) { + return (T) this.deserializer.deserialize(topic, bytes, readerSchema); + } + + @Override + public void close() { + this.deserializer.close(); + } } -- GitLab