From 6e3c43d331c44b7e51e750e0d38125489011646e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 12 Mar 2021 17:57:04 +0100
Subject: [PATCH] Refactor KafkaConnectorFactory

---
 .../commons/flink/KafkaConnectorFactory.java  | 154 ++++++++++++++++++
 .../application/KafkaConnectionFactory.java   |  84 ----------
 2 files changed, 154 insertions(+), 84 deletions(-)
 create mode 100644 theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java
 delete mode 100644 theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/KafkaConnectionFactory.java

diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java
new file mode 100644
index 000000000..55d73b0fb
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java
@@ -0,0 +1,154 @@
+package theodolite.commons.flink;
+
+import java.time.Duration;
+import java.util.Properties;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.util.SerializableSupplier;
+
+/**
+ * A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
+ */
+public class KafkaConnectorFactory {
+
+  private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
+
+  private final Properties kafkaProps = new Properties();
+  private final boolean checkpointingEnabled;
+  private final String schemaRegistryUrl;
+
+  /**
+   * Create a new {@link KafkaConnectorFactory} from the provided parameters.
+   */
+  public KafkaConnectorFactory(
+      final String appName,
+      final String bootstrapServers,
+      final boolean checkpointingEnabled,
+      final String schemaRegistryUrl) {
+    this.checkpointingEnabled = checkpointingEnabled;
+    this.schemaRegistryUrl = schemaRegistryUrl;
+    this.kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    this.kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, appName);
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes data using a
+   * {@link DeserializationSchema}.
+   */
+  public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final DeserializationSchema<T> deserializationSchema) {
+    return this.createBaseConsumer(
+        new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes data using a
+   * {@link KafkaDeserializationSchema}.
+   */
+  public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final KafkaDeserializationSchema<T> deserializationSchema) {
+    return this.createBaseConsumer(
+        new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes {@link Tuple2}s using two Kafka
+   * {@link Serde}s.
+   */
+  public <K, V> FlinkKafkaConsumer<Tuple2<K, V>> createConsumer(
+      final String topic,
+      final SerializableSupplier<Serde<K>> kafkaKeySerde,
+      final SerializableSupplier<Serde<V>> kafkaValueSerde,
+      final TypeInformation<Tuple2<K, V>> typeInformation) {
+    return this.<Tuple2<K, V>>createConsumer(
+        topic,
+        new FlinkKafkaKeyValueSerde<>(
+            topic,
+            kafkaKeySerde,
+            kafkaValueSerde,
+            typeInformation));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes from a topic associated with Confluent
+   * Schema Registry.
+   */
+  public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final Class<T> typeClass) {
+    // Maybe move to subclass for Confluent-Schema-Registry-specific things
+    final DeserializationSchema<T> deserializationSchema =
+        ConfluentRegistryAvroDeserializationSchema.forSpecific(typeClass, this.schemaRegistryUrl);
+    return this.createConsumer(topic, deserializationSchema);
+  }
+
+  private <T> FlinkKafkaConsumer<T> createBaseConsumer(final FlinkKafkaConsumer<T> baseConsumer) {
+    baseConsumer.setStartFromGroupOffsets();
+    if (this.checkpointingEnabled) {
+      baseConsumer.setCommitOffsetsOnCheckpoints(true); // TODO Validate if this is sensible
+    }
+    baseConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
+    return baseConsumer;
+  }
+
+
+  /**
+   * Create a new {@link FlinkKafkaProducer} that produces data using a
+   * {@link KafkaSerializationSchema}.
+   */
+  public <T> FlinkKafkaProducer<T> createProducer(final String topic,
+      final KafkaSerializationSchema<T> serializationSchema) {
+    final Properties producerProps = this.buildProducerProperties();
+    return this.createBaseProducer(new FlinkKafkaProducer<>(
+        topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaProducer} that produces {@link Tuple2}s using two Kafka
+   * {@link Serde}s.
+   */
+  public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
+      final String topic,
+      final SerializableSupplier<Serde<K>> kafkaKeySerde,
+      final SerializableSupplier<Serde<V>> kafkaValueSerde,
+      final TypeInformation<Tuple2<K, V>> typeInformation) {
+    return this.createProducer(
+        topic,
+        new FlinkKafkaKeyValueSerde<>(
+            topic,
+            kafkaKeySerde,
+            kafkaValueSerde,
+            typeInformation));
+  }
+
+  private <T> FlinkKafkaProducer<T> createBaseProducer(final FlinkKafkaProducer<T> baseProducer) {
+    baseProducer.setWriteTimestampToKafka(true);
+    return baseProducer;
+  }
+
+  private Properties buildProducerProperties() {
+    final Properties producerProps = this.cloneProperties();
+    producerProps.setProperty(
+        ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+        String.valueOf(PRODUCER_TRANSACTION_TIMEOUT.toMillis())); // TODO necessary?
+    return producerProps;
+  }
+
+  private Properties cloneProperties() {
+    final Properties props = new Properties();
+    props.putAll(this.kafkaProps);
+    return props;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/KafkaConnectionFactory.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/KafkaConnectionFactory.java
deleted file mode 100644
index 8ace0d50c..000000000
--- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/KafkaConnectionFactory.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package theodolite.uc2.application;
-
-import java.time.Duration;
-import java.util.Properties;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
-import org.apache.kafka.common.serialization.Serde;
-import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
-
-/**
- * A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
- */
-public class KafkaConnectionFactory {
-
-  private static final Duration PRODUCER_TRANSACTION_TIMEOUT_MS = Duration.ofMinutes(5);
-
-  private final Properties kafkaProps = new Properties();
-  private final boolean checkpointingEnabled;
-  // private final long checkpointingIntervalMs;
-  private final String schemaRegistryUrl;
-
-  public KafkaConnectionFactory(
-      final String appName,
-      final String bootstrapServers,
-      final boolean checkpointingEnabled,
-      final String schemaRegistryUrl) {
-    this.checkpointingEnabled = checkpointingEnabled;
-    this.schemaRegistryUrl = schemaRegistryUrl;
-    this.kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
-    this.kafkaProps.setProperty("group.id", appName);
-  }
-
-  public <T> FlinkKafkaConsumer<T> createConsumer(
-      final DeserializationSchema<T> deserializationSchema, final String topic) {
-    final FlinkKafkaConsumer<T> consumer =
-        new FlinkKafkaConsumer<>(topic, deserializationSchema, this.kafkaProps);
-    consumer.setStartFromGroupOffsets();
-    if (this.checkpointingEnabled) {
-      consumer.setCommitOffsetsOnCheckpoints(true);
-    }
-    consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
-    return consumer;
-  }
-
-  // Maybe move to subclass
-  public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final Class<T> tClass,
-      final String topic) {
-    final DeserializationSchema<T> deserializationSchema =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(tClass, this.schemaRegistryUrl);
-    return this.createConsumer(deserializationSchema, topic);
-  }
-
-  public <T> FlinkKafkaProducer<T> createProducer(
-      final KafkaSerializationSchema<T> serializationSchema, final String topic) {
-    final Properties producerProps = new Properties(this.kafkaProps);
-    producerProps.setProperty("transaction.timeout.ms",
-        String.valueOf(PRODUCER_TRANSACTION_TIMEOUT_MS.toMillis())); // TODO necessary?
-    final FlinkKafkaProducer<T> producer = new FlinkKafkaProducer<>(
-        topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-    producer.setWriteTimestampToKafka(true);
-    return producer;
-  }
-
-  public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
-      final Serde<K> kafkaKeySerde, final Serde<V> kafkaValueSerde,
-      final TypeInformation<Tuple2<K, V>> typeInformation, final String topic) {
-    return this.createProducer(
-        new FlinkKafkaKeyValueSerde<>(
-            topic,
-            () -> kafkaKeySerde,
-            () -> kafkaValueSerde,
-            typeInformation),
-        topic);
-
-  }
-
-}
-- 
GitLab