From 4a8358536753698802c8e2d5b1a64a666d3ec90c Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 25 Mar 2022 23:47:33 +0100
Subject: [PATCH] Generalize KafkaPropertiesBuilder

Add a common class for building kafka properties
Add commit offset option in order to provide lag information
---
 .../hazelcastjet/KafkaPropertiesBuilder.java  | 88 +++++++++++++++++++
 .../uc1/hazelcastjet/HistoryService.java      |  2 +-
 .../hazelcastjet/Uc1HazelcastJetFactory.java  | 15 +++-
 .../Uc1KafkaPropertiesBuilder.java            |  2 +-
 .../uc2/hazelcastjet/HistoryService.java      |  4 +-
 .../hazelcastjet/Uc2HazelcastJetFactory.java  | 27 ++++--
 .../uc3/hazelcastjet/HistoryService.java      |  4 +-
 .../hazelcastjet/Uc3HazelcastJetFactory.java  | 27 ++++--
 .../uc4/hazelcastjet/HistoryService.java      |  2 +-
 .../hazelcastjet/Uc4HazelcastJetFactory.java  | 42 ++++++---
 10 files changed, 173 insertions(+), 40 deletions(-)
 create mode 100644 theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java
new file mode 100644
index 000000000..3c1783e79
--- /dev/null
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java
@@ -0,0 +1,88 @@
+package rocks.theodolite.benchmarks.commons.hazelcastjet;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+
+public class KafkaPropertiesBuilder {
+
+  private static final String TRUE = "true";
+  private static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
+
+
+  /**
+   * Builds Kafka Properties used for the UC4 Benchmark pipeline.
+   *
+   * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
+   * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
+   * @return A Kafka Properties Object containing the values needed for a Pipeline.
+   */
+  public Properties buildKafkaInputReadPropsFromEnv(final String kafkaBootstrapServerDefault,
+                                                    final String schemaRegistryUrlDefault,
+                                                    final String applicationName,
+                                                    final String keyDeserializer,
+                                                    final String valueDeserializer) {
+
+    final String kafkaBootstrapServers = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
+        kafkaBootstrapServerDefault);
+    final String schemaRegistryUrl = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
+        schemaRegistryUrlDefault);
+
+    final Properties props = new Properties();
+    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        keyDeserializer);
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        valueDeserializer);
+    props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+    props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
+    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
+
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,applicationName);
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,TRUE);
+
+    return props;
+  }
+
+  /**
+   * Builds Kafka Properties used for the UC4 Benchmark pipeline.
+   *
+   * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
+   * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
+   *         Pipeline.
+   */
+  public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault,
+                                                final String schemaRegistryUrlDefault,
+                                                final String keySerializer,
+                                                final String valueSerializer) {
+
+    final String kafkaBootstrapServers = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
+        kafkaBootstrapServerDefault);
+    final String schemaRegistryUrl = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
+        schemaRegistryUrlDefault);
+
+    final Properties props = new Properties();
+    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        keySerializer);
+    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        valueSerializer);
+    props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
+    props.setProperty("specific.avro.writer", TRUE);
+
+    return props;
+  }
+
+
+
+
+
+}
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java
index 4795c9e33..838482613 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java
@@ -54,7 +54,7 @@ public class HistoryService {
    */
   private void createHazelcastJetApplication() throws Exception { // NOPMD
     new Uc1HazelcastJetFactory()
-        .setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
+        .setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
         .setKafkaInputTopicFromEnv(KAFKA_TOPIC_DEFAULT)
         .buildUc1Pipeline()
         .buildUc1JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java
index d69c93f74..4a5c5dead 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java
@@ -3,11 +3,14 @@ package rocks.theodolite.benchmarks.uc1.hazelcastjet;
 import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.jet.pipeline.Pipeline;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
 
 /**
  * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC1
@@ -131,12 +134,16 @@ public class Uc1HazelcastJetFactory {
    * @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline.
    */
   public Uc1HazelcastJetFactory setPropertiesFromEnv(final String bootstrapServersDefault, // NOPMD
-      final String schemaRegistryUrlDefault) {
+                                                     final String schemaRegistryUrlDefault,
+                                                     final String jobName) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc1KafkaPropertiesBuilder propsBuilder = new Uc1KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
     final Properties kafkaProps =
-        propsBuilder.buildKafkaPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+        propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            jobName,
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
     this.kafkaPropertiesForPipeline = kafkaProps;
     return this;
   }
diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java
index e753afd99..c5d38807d 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java
@@ -20,7 +20,7 @@ public class Uc1KafkaPropertiesBuilder {
   /**
    * Builds Kafka Properties used for the UC1 Benchmark pipeline.
    *
-   * @param kafkaBootstrapServerDefault Default bootstrap server if not net by envrionment.
+   * @param kafkaBootstrapServerDefault Default bootstrap server if not net by environment.
    * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
    * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC1
    *         Pipeline.
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
index ea2056ce0..f382978b7 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
@@ -57,8 +57,8 @@ public class HistoryService {
    */
   private void createHazelcastJetApplication() throws Exception { // NOPMD
     new Uc2HazelcastJetFactory()
-        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
-        .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT)
+        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
+        .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT,SCHEMA_REGISTRY_URL_DEFAULT)
         .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
         .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
         .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT_MS)
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
index ca4f95712..143b154f3 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
@@ -4,11 +4,15 @@ import com.google.common.math.StatsAccumulator;
 import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.jet.pipeline.Pipeline;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
 import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
 
 /**
@@ -176,13 +180,17 @@ public class Uc2HazelcastJetFactory {
    * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
    */
   public Uc2HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
-      final String bootstrapServersDefault,
-      final String schemaRegistryUrlDefault) {
+                                                         final String bootstrapServersDefault,
+                                                         final String schemaRegistryUrlDefault,
+                                                         final String jobName) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc2KafkaPropertiesBuilder propsBuilder = new Uc2KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
     final Properties kafkaReadProps =
-        propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+        propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            jobName,
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
     this.kafkaReadPropsForPipeline = kafkaReadProps;
     return this;
   }
@@ -195,11 +203,14 @@ public class Uc2HazelcastJetFactory {
    * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
    */
   public Uc2HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
-      final String bootstrapServersDefault) {
+      final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc2KafkaPropertiesBuilder propsBuilder = new Uc2KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
     final Properties kafkaWriteProps =
-        propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault);
+        propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            StringSerializer.class.getCanonicalName(),
+            StringSerializer.class.getCanonicalName());
     this.kafkaWritePropsForPipeline = kafkaWriteProps;
     return this;
   }
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
index 9b02da5b6..ecf38bd6c 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
@@ -58,8 +58,8 @@ public class HistoryService {
    */
   private void createHazelcastJetApplication() throws Exception { // NOPMD
     new Uc3HazelcastJetFactory()
-        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
-        .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT)
+        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT, JOB_NAME)
+        .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
         .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
         .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
         .setWindowSizeInSecondsFromEnv(WINDOW_SIZE_IN_SECONDS_DEFAULT)
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java
index 2e869056c..be6d70d27 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java
@@ -3,11 +3,15 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.jet.pipeline.Pipeline;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
 import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
 import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
 
@@ -188,13 +192,17 @@ public class Uc3HazelcastJetFactory { // NOPMD
    * @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
    */
   public Uc3HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
-      final String bootstrapServersDefault,
-      final String schemaRegistryUrlDefault) {
+                                                         final String bootstrapServersDefault,
+                                                         final String schemaRegistryUrlDefault,
+                                                         final String jobName) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc3KafkaPropertiesBuilder propsBuilder = new Uc3KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
     final Properties kafkaReadProps =
-        propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+        propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            jobName,
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
     this.kafkaReadPropsForPipeline = kafkaReadProps;
     return this;
   }
@@ -207,11 +215,14 @@ public class Uc3HazelcastJetFactory { // NOPMD
    * @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
    */
   public Uc3HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
-      final String bootstrapServersDefault) {
+      final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc3KafkaPropertiesBuilder propsBuilder = new Uc3KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
     final Properties kafkaWriteProps =
-        propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault);
+        propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            StringSerializer.class.getCanonicalName(),
+            StringSerializer.class.getCanonicalName());
     this.kafkaWritePropsForPipeline = kafkaWriteProps;
     return this;
   }
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
index cd0b961c7..419c25fec 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
@@ -59,7 +59,7 @@ public class HistoryService {
    */
   private void createHazelcastJetApplication() throws Exception { // NOPMD
     new Uc4HazelcastJetFactory()
-        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
+        .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
         .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
         .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
         .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java
index a4dc2d823..a352ec6ca 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java
@@ -3,11 +3,16 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet;
 import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.jet.pipeline.Pipeline;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
 import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
 import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
 import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
@@ -15,15 +20,13 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
 import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
 import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
 
-
-
 /**
  * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4
  * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
  * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
- * the Read and Write Propertiesand set the input, output, and configuration topic. This can be done
- * using internal functions of this factory. Outside data only refers to custom values or default
- * values in case data of the environment cannot the fetched.
+ * the Read and Write Properties and set the input, output, and configuration topic. This can be
+ * done using internal functions of this factory. Outside data only refers to custom values or
+ * default values in case data of the environment cannot the fetched.
  */
 public class Uc4HazelcastJetFactory {
 
@@ -193,19 +196,32 @@ public class Uc4HazelcastJetFactory {
    * @return The Uc4HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
    */
   public Uc4HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
-      final String bootstrapServersDefault,
-      final String schemaRegistryUrlDefault) {
+                                                         final String bootstrapServersDefault,
+                                                         final String schemaRegistryUrlDefault,
+                                                         final String jobName) {
     // Use KafkaPropertiesBuilder to build a properties object used for kafka
-    final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder();
+    final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
+
     final Properties kafkaInputReadProps =
         propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+            schemaRegistryUrlDefault, jobName,
+            StringSerializer.class.getCanonicalName(),
+            StringSerializer.class.getCanonicalName());
+
     final Properties kafkaConfigReadProps =
-        propsBuilder.buildKafkaConfigReadPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+        propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            jobName,
+            EventDeserializer.class.getCanonicalName(),
+            StringDeserializer.class.getCanonicalName());
+
     final Properties kafkaAggregationReadProps =
-        propsBuilder.buildKafkaAggregationReadPropsFromEnv(bootstrapServersDefault,
-            schemaRegistryUrlDefault);
+        propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
+            schemaRegistryUrlDefault,
+            jobName,
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
     this.kafkaInputReadPropsForPipeline = kafkaInputReadProps;
     this.kafkaConfigPropsForPipeline = kafkaConfigReadProps;
     this.kafkaFeedbackPropsForPipeline = kafkaAggregationReadProps;
-- 
GitLab