From e473b475b737d6eafdb93cff3ca51ef1ed5b4e3c Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 25 May 2022 16:46:15 +0200
Subject: [PATCH] Extend  KafkaPropertiesBuilder commons-hazelcastjet

---
 .../hazelcastjet/KafkaPropertiesBuilder.java  | 76 +++++++++++++++++--
 1 file changed, 70 insertions(+), 6 deletions(-)

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java
index 9bce60f57..ef089fd7d 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java
@@ -11,13 +11,81 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 /**
  * Generalized builder for Kafka properties.
  * Will always set AUTO_OFFSET_RESET_CONFIG to earliest
- *
  */
 public class KafkaPropertiesBuilder {
 
   private static final String TRUE = "true";
   private static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
 
+  private final String specificAvroWriter = "specific.avro.writer";
+
+  private Properties readProperties;
+
+  private Properties writeProperties;
+
+  public KafkaPropertiesBuilder() {
+  }
+
+  /**
+   * Constructs a new PropertiesBuilder with defined default read and write properties.
+   * @param kafkaBootstrapServer default boostrap address property.
+   * @param schemaRegistryUrl default schema registry address property.
+   * @param jobName default job name property.
+   */
+  public KafkaPropertiesBuilder(final String kafkaBootstrapServer,
+                                final String schemaRegistryUrl,
+                                final String jobName) {
+
+    this.writeProperties = new Properties();
+    this.readProperties = new Properties();
+    readProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
+    readProperties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
+        schemaRegistryUrl);
+
+    writeProperties.putAll(readProperties);
+
+    readProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, jobName);
+    readProperties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
+    readProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
+    readProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, TRUE);
+
+    writeProperties.setProperty(specificAvroWriter, TRUE);
+  }
+
+  /**
+   * Returns default read properties with the defined deserializers.
+   * @param keyDeserializer deserializer for the key.
+   * @param valueDeserializer deserializer for the value.
+   */
+  public Properties buildReadProperties(final String keyDeserializer,
+                                        final String valueDeserializer) {
+
+    final Properties props = new Properties();
+    props.putAll(this.readProperties);
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        keyDeserializer);
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        valueDeserializer);
+    return props;
+  }
+
+  /**
+   * Returns default read properties with the defined Serializers.
+   * @param keySerializer serializer for the key.
+   * @param valueSerializer serializer for the value.
+   */
+  public Properties buildWriteProperties(final String keySerializer,
+                                        final String valueSerializer) {
+
+    final Properties props = new Properties();
+    props.putAll(this.writeProperties);
+    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        keySerializer);
+    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        valueSerializer);
+    return props;
+  }
+
 
   /**
    * Builds Kafka Properties used for the UC4 Benchmark pipeline.
@@ -87,13 +155,9 @@ public class KafkaPropertiesBuilder {
     props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         valueSerializer);
     props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
-    props.setProperty("specific.avro.writer", TRUE);
+    props.setProperty(specificAvroWriter, TRUE);
 
     return props;
   }
 
-
-
-
-
 }
-- 
GitLab