Skip to content
Snippets Groups Projects
Commit ac0e3dc5 authored by Sören Henning's avatar Sören Henning
Browse files

Code refactoring for Kafka Beam reader

parent 4083c0f6
No related branches found
No related tags found
No related merge requests found
Pipeline #6334 failed
package theodolite.commons.beam.kafka;
import titan.ccp.model.records.ActivePowerRecord;
public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> {
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaActivePowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig) {
super();
if (bootstrapServer == null) {
throw new IllegalArgumentException("bootstrapServer is null");
}
if (inputTopic == null) {
throw new IllegalArgumentException("inputTopic is null");
}
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
}
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
...@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer; ...@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
* Has additional a TimestampPolicy.
*/ */
public class KafkaActivePowerTimestampReader extends public class KafkaActivePowerTimestampReader
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, public KafkaActivePowerTimestampReader(
final Map<String, Object> consumerConfig) { final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if bootstrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
reader = this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
KafkaIO.<String, ActivePowerRecord>read() .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
.withBootstrapServers(bootstrapServer) .withValueDeserializerAndCoder(
.withTopic(inputTopic) ActivePowerRecordDeserializer.class,
.withKeyDeserializer(StringDeserializer.class) AvroCoder.of(ActivePowerRecord.class))
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, .withConsumerConfigUpdates(consumerConfig)
AvroCoder.of(ActivePowerRecord.class)) .withTimestampPolicyFactory(
.withConsumerConfigUpdates(consumerConfig) (tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
// Set TimeStampPolicy for event time .withoutMetadata();
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
} }
@Override @Override
......
package theodolite.commons.beam.kafka;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> {
private final KafkaAvroDeserializer deserializer;
public TypedKafkaAvroDeserializer() {
this.deserializer = new KafkaAvroDeserializer();
}
public TypedKafkaAvroDeserializer(SchemaRegistryClient client) {
this.deserializer = new KafkaAvroDeserializer(client);
}
public TypedKafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
this.deserializer = new KafkaAvroDeserializer(client, props);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(String s, byte[] bytes) {
return (T) this.deserializer.deserialize(s, bytes);
}
/**
* Pass a reader schema to get an Avro projection
*/
@SuppressWarnings("unchecked")
public T deserialize(String topic, byte[] bytes, Schema readerSchema) {
return (T) this.deserializer.deserialize(topic, bytes, readerSchema);
}
@Override
public void close() {
this.deserializer.close();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment