Skip to content
Snippets Groups Projects
Commit 6e3c43d3 authored by Sören Henning's avatar Sören Henning
Browse files

Refactor KafkaConnectorFactory

parent 26bab409
Branches
Tags
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2284 failed
package theodolite.uc2.application;
package theodolite.commons.flink;
import java.time.Duration;
import java.util.Properties;
......@@ -10,75 +10,145 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.util.SerializableSupplier;
/**
* A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
*/
public class KafkaConnectionFactory {
public class KafkaConnectorFactory {
private static final Duration PRODUCER_TRANSACTION_TIMEOUT_MS = Duration.ofMinutes(5);
private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
private final Properties kafkaProps = new Properties();
private final boolean checkpointingEnabled;
// private final long checkpointingIntervalMs;
private final String schemaRegistryUrl;
public KafkaConnectionFactory(
/**
* Create a new {@link KafkaConnectorFactory} from the provided parameters.
*/
public KafkaConnectorFactory(
final String appName,
final String bootstrapServers,
final boolean checkpointingEnabled,
final String schemaRegistryUrl) {
this.checkpointingEnabled = checkpointingEnabled;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
this.kafkaProps.setProperty("group.id", appName);
this.kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, appName);
}
public <T> FlinkKafkaConsumer<T> createConsumer(
final DeserializationSchema<T> deserializationSchema, final String topic) {
final FlinkKafkaConsumer<T> consumer =
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.kafkaProps);
consumer.setStartFromGroupOffsets();
if (this.checkpointingEnabled) {
consumer.setCommitOffsetsOnCheckpoints(true);
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link DeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final DeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link KafkaDeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final KafkaDeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
return consumer;
/**
* Create a new {@link FlinkKafkaConsumer} that consumes {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaConsumer<Tuple2<K, V>> createConsumer(
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.<Tuple2<K, V>>createConsumer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
// Maybe move to subclass
public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final Class<T> tClass,
final String topic) {
/**
* Create a new {@link FlinkKafkaConsumer} that consumes from a topic associated with Confluent
* Schema Registry.
*/
public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final String topic,
final Class<T> typeClass) {
// Maybe move to subclass for Confluent-Schema-Registry-specific things
final DeserializationSchema<T> deserializationSchema =
ConfluentRegistryAvroDeserializationSchema.forSpecific(tClass, this.schemaRegistryUrl);
return this.createConsumer(deserializationSchema, topic);
ConfluentRegistryAvroDeserializationSchema.forSpecific(typeClass, this.schemaRegistryUrl);
return this.createConsumer(topic, deserializationSchema);
}
private <T> FlinkKafkaConsumer<T> createBaseConsumer(final FlinkKafkaConsumer<T> baseConsumer) {
baseConsumer.setStartFromGroupOffsets();
if (this.checkpointingEnabled) {
baseConsumer.setCommitOffsetsOnCheckpoints(true); // TODO Validate if this is sensible
}
baseConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
return baseConsumer;
}
public <T> FlinkKafkaProducer<T> createProducer(
final KafkaSerializationSchema<T> serializationSchema, final String topic) {
final Properties producerProps = new Properties(this.kafkaProps);
producerProps.setProperty("transaction.timeout.ms",
String.valueOf(PRODUCER_TRANSACTION_TIMEOUT_MS.toMillis())); // TODO necessary?
final FlinkKafkaProducer<T> producer = new FlinkKafkaProducer<>(
topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
producer.setWriteTimestampToKafka(true);
return producer;
/**
* Create a new {@link FlinkKafkaProducer} that produces data using a
* {@link KafkaSerializationSchema}.
*/
public <T> FlinkKafkaProducer<T> createProducer(final String topic,
final KafkaSerializationSchema<T> serializationSchema) {
final Properties producerProps = this.buildProducerProperties();
return this.createBaseProducer(new FlinkKafkaProducer<>(
topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
}
/**
* Create a new {@link FlinkKafkaProducer} that produces {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
final Serde<K> kafkaKeySerde, final Serde<V> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation, final String topic) {
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.createProducer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
() -> kafkaKeySerde,
() -> kafkaValueSerde,
typeInformation),
topic);
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
private <T> FlinkKafkaProducer<T> createBaseProducer(final FlinkKafkaProducer<T> baseProducer) {
baseProducer.setWriteTimestampToKafka(true);
return baseProducer;
}
private Properties buildProducerProperties() {
final Properties producerProps = this.cloneProperties();
producerProps.setProperty(
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
String.valueOf(PRODUCER_TRANSACTION_TIMEOUT.toMillis())); // TODO necessary?
return producerProps;
}
private Properties cloneProperties() {
final Properties props = new Properties();
props.putAll(this.kafkaProps);
return props;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment