Skip to content
Snippets Groups Projects
Commit 7c7d0634 authored by Sören Henning's avatar Sören Henning
Browse files

Fix code quality issues

parent 2b3bc270
No related branches found
No related tags found
No related merge requests found
Pipeline #6337 passed
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}.
*/
public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> { public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> {
} }
...@@ -6,44 +6,54 @@ import java.util.Map; ...@@ -6,44 +6,54 @@ import java.util.Map;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
/**
* A Kafka {@link Deserializer} for the Confluent Schema Registry, similar to
* {@link KafkaAvroDeserializer} but for typed records.
*
* @param <T> Type to be deserialized into.
*/
public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> { public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> {
private final KafkaAvroDeserializer deserializer; private final KafkaAvroDeserializer deserializer;
public TypedKafkaAvroDeserializer() { public TypedKafkaAvroDeserializer() {
this.deserializer = new KafkaAvroDeserializer(); this.deserializer = new KafkaAvroDeserializer();
} }
public TypedKafkaAvroDeserializer(SchemaRegistryClient client) { public TypedKafkaAvroDeserializer(final SchemaRegistryClient client) {
this.deserializer = new KafkaAvroDeserializer(client); this.deserializer = new KafkaAvroDeserializer(client);
} }
public TypedKafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) { public TypedKafkaAvroDeserializer(final SchemaRegistryClient client, final Map<String, ?> props) {
this.deserializer = new KafkaAvroDeserializer(client, props); this.deserializer = new KafkaAvroDeserializer(client, props);
} }
@Override public TypedKafkaAvroDeserializer(final KafkaAvroDeserializer kafkaAvroDeserializer) {
public void configure(Map<String, ?> configs, boolean isKey) { this.deserializer = kafkaAvroDeserializer;
}
}
@Override
@SuppressWarnings("unchecked") public void configure(final Map<String, ?> configs, final boolean isKey) {
@Override this.deserializer.configure(configs, isKey);
public T deserialize(String s, byte[] bytes) { }
return (T) this.deserializer.deserialize(s, bytes);
} @SuppressWarnings("unchecked")
@Override
/** public T deserialize(final String s, final byte[] bytes) {
* Pass a reader schema to get an Avro projection return (T) this.deserializer.deserialize(s, bytes);
*/ }
@SuppressWarnings("unchecked")
public T deserialize(String topic, byte[] bytes, Schema readerSchema) { /**
return (T) this.deserializer.deserialize(topic, bytes, readerSchema); * Pass a reader schema to get an Avro projection.
} */
@SuppressWarnings("unchecked")
@Override public T deserialize(final String topic, final byte[] bytes, final Schema readerSchema) {
public void close() { return (T) this.deserializer.deserialize(topic, bytes, readerSchema);
this.deserializer.close(); }
}
@Override
public void close() {
this.deserializer.close();
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment