diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java index 15cd024fe60cf213e8798420b3febef3ae0ca50c..c53dde3d5f4b7d18822c916a637c356b898fe2cd 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java @@ -1,10 +1,11 @@ package theodolite.commons.beam.kafka; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import org.apache.kafka.common.serialization.Deserializer; import titan.ccp.model.records.ActivePowerRecord; /** * A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}. */ -public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> { +public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> { } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java deleted file mode 100644 index c08e8b8ae902d65f4a60bd58a69b06879435e417..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java +++ /dev/null @@ -1,59 +0,0 @@ -package theodolite.commons.beam.kafka; - -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Map; -import org.apache.avro.Schema; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * A Kafka {@link Deserializer} for the Confluent Schema Registry, similar to - * {@link KafkaAvroDeserializer} but for typed records. - * - * @param <T> Type to be deserialized into. - */ -public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> { - - private final KafkaAvroDeserializer deserializer; - - public TypedKafkaAvroDeserializer() { - this.deserializer = new KafkaAvroDeserializer(); - } - - public TypedKafkaAvroDeserializer(final SchemaRegistryClient client) { - this.deserializer = new KafkaAvroDeserializer(client); - } - - public TypedKafkaAvroDeserializer(final SchemaRegistryClient client, final Map<String, ?> props) { - this.deserializer = new KafkaAvroDeserializer(client, props); - } - - public TypedKafkaAvroDeserializer(final KafkaAvroDeserializer kafkaAvroDeserializer) { - this.deserializer = kafkaAvroDeserializer; - } - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - this.deserializer.configure(configs, isKey); - } - - @SuppressWarnings("unchecked") - @Override - public T deserialize(final String s, final byte[] bytes) { - return (T) this.deserializer.deserialize(s, bytes); - } - - /** - * Pass a reader schema to get an Avro projection. - */ - @SuppressWarnings("unchecked") - public T deserialize(final String topic, final byte[] bytes, final Schema readerSchema) { - return (T) this.deserializer.deserialize(topic, bytes, readerSchema); - } - - @Override - public void close() { - this.deserializer.close(); - } - -}