diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java index 2ec0f464d60e3e1166ec32145cf80e92a9aba241..15cd024fe60cf213e8798420b3febef3ae0ca50c 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java @@ -1,6 +1,10 @@ package theodolite.commons.beam.kafka; +import org.apache.kafka.common.serialization.Deserializer; import titan.ccp.model.records.ActivePowerRecord; +/** + * A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}. + */ public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> { } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java index b213aa0bc8773604eb05e11c0546afcca3c137ac..c08e8b8ae902d65f4a60bd58a69b06879435e417 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java @@ -6,44 +6,54 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.kafka.common.serialization.Deserializer; +/** + * A Kafka {@link Deserializer} for the Confluent Schema Registry, similar to + * {@link KafkaAvroDeserializer} but for typed records. + * + * @param <T> Type to be deserialized into. + */ public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> { - private final KafkaAvroDeserializer deserializer; - - public TypedKafkaAvroDeserializer() { - this.deserializer = new KafkaAvroDeserializer(); - } - - public TypedKafkaAvroDeserializer(SchemaRegistryClient client) { - this.deserializer = new KafkaAvroDeserializer(client); - } - - public TypedKafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) { - this.deserializer = new KafkaAvroDeserializer(client, props); - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - - } - - @SuppressWarnings("unchecked") - @Override - public T deserialize(String s, byte[] bytes) { - return (T) this.deserializer.deserialize(s, bytes); - } - - /** - * Pass a reader schema to get an Avro projection - */ - @SuppressWarnings("unchecked") - public T deserialize(String topic, byte[] bytes, Schema readerSchema) { - return (T) this.deserializer.deserialize(topic, bytes, readerSchema); - } - - @Override - public void close() { - this.deserializer.close(); - } + private final KafkaAvroDeserializer deserializer; + + public TypedKafkaAvroDeserializer() { + this.deserializer = new KafkaAvroDeserializer(); + } + + public TypedKafkaAvroDeserializer(final SchemaRegistryClient client) { + this.deserializer = new KafkaAvroDeserializer(client); + } + + public TypedKafkaAvroDeserializer(final SchemaRegistryClient client, final Map<String, ?> props) { + this.deserializer = new KafkaAvroDeserializer(client, props); + } + + public TypedKafkaAvroDeserializer(final KafkaAvroDeserializer kafkaAvroDeserializer) { + this.deserializer = kafkaAvroDeserializer; + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + this.deserializer.configure(configs, isKey); + } + + @SuppressWarnings("unchecked") + @Override + public T deserialize(final String s, final byte[] bytes) { + return (T) this.deserializer.deserialize(s, bytes); + } + + /** + * Pass a reader schema to get an Avro projection. + */ + @SuppressWarnings("unchecked") + public T deserialize(final String topic, final byte[] bytes, final Schema readerSchema) { + return (T) this.deserializer.deserialize(topic, bytes, readerSchema); + } + + @Override + public void close() { + this.deserializer.close(); + } }