From 1ab165344f79d9dc8c44a2712c1c9de79e6bf98d Mon Sep 17 00:00:00 2001
From: MaxEmerold <wiedenhoeft.max@gmail.com>
Date: Tue, 7 Dec 2021 11:03:30 +0100
Subject: [PATCH] Fix UC4 Hazelcast Jet implementation

- Added StreamSerializer for ImmutableSensorRegistry
- Changed Map.Entry to Util.Entry for serialization support
- Pipeline now applicable for multiple running JetInstances
---
 .../uc3/applicationold/ClusterConfig.java     |  76 -----
 .../uc3/applicationold/ConfigurationKeys.java |  48 ---
 .../uc3/applicationold/HistoryService.java    | 303 ------------------
 .../application/Uc4HazelcastJetFactory.java   |   6 +-
 .../uc4/application/Uc4PipelineBuilder.java   |  10 +-
 .../ImmutableSensorRegistryUc4Serializer.java |  36 +++
 6 files changed, 46 insertions(+), 433 deletions(-)
 delete mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ClusterConfig.java
 delete mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ConfigurationKeys.java
 delete mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/HistoryService.java
 create mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ImmutableSensorRegistryUc4Serializer.java

diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ClusterConfig.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ClusterConfig.java
deleted file mode 100644
index 923749478..000000000
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ClusterConfig.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package theodolite.uc3.applicationold;
-
-/**
- * Configuration of a load generator cluster.
- */
-public final class ClusterConfig {
-
-  private static final int PORT_DEFAULT = 5701;
-  private static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
-
-  private final String bootstrapServer;
-  private final String kubernetesDnsName;
-  private int port = PORT_DEFAULT;
-  private boolean portAutoIncrement = true;
-  private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT;
-
-  /**
-   * Create a new {@link ClusterConfig} with the given parameter values.
-   */
-  private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) {
-    this.bootstrapServer = bootstrapServer;
-    this.kubernetesDnsName = kubernetesDnsName;
-  }
-
-  public boolean hasBootstrapServer() {
-    return this.bootstrapServer != null;
-  }
-
-  public String getBootstrapServer() {
-    return this.bootstrapServer;
-  }
-
-  public boolean hasKubernetesDnsName() {
-    return this.kubernetesDnsName != null;
-  }
-
-  public String getKubernetesDnsName() {
-    return this.kubernetesDnsName;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public boolean isPortAutoIncrement() {
-    return this.portAutoIncrement;
-  }
-
-  public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD
-    this.portAutoIncrement = portAutoIncrement;
-    return this;
-  }
-
-  public ClusterConfig setPort(final int port) { // NOPMD
-    this.port = port;
-    return this;
-  }
-
-  public String getClusterNamePrefix() {
-    return this.clusterNamePrefix;
-  }
-
-  public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD
-    this.clusterNamePrefix = clusterNamePrefix;
-    return this;
-  }
-
-  public static ClusterConfig fromBootstrapServer(final String bootstrapServer) {
-    return new ClusterConfig(bootstrapServer, null);
-  }
-
-  public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) {
-    return new ClusterConfig(null, kubernetesDnsName);
-  }
-
-}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ConfigurationKeys.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ConfigurationKeys.java
deleted file mode 100644
index 020213ad7..000000000
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/ConfigurationKeys.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package theodolite.uc3.applicationold;
-
-/**
- * Keys to access configuration parameters.
- */
-public final class ConfigurationKeys {
-
-  public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
-
-  public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME";
-
-  public static final String PORT = "PORT";
-
-  public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
-
-  public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
-
-  public static final String NUM_SENSORS = "NUM_SENSORS";
-
-  public static final String PERIOD_MS = "PERIOD_MS";
-  
-  public static final String DOWNSAMPLE_INTERVAL = "DOWNSAMPLE_INTERVAL";
-  
-  public static final String WINDOW_SIZE_IN_SECONDS = "WINDOW_SIZE_IN_SECONDS";
-  
-  public static final String HOPPING_SIZE_IN_SECONDS = "HOPPING_SIZE_IN_SECONDS";
-
-  public static final String VALUE = "VALUE";
-
-  public static final String THREADS = "THREADS";
-
-  public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
-
-  public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
-
-  public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC";
-  
-  public static final String KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC";
-
-  public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE";
-
-  public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS";
-
-  public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
-
-  private ConfigurationKeys() {}
-
-}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/HistoryService.java
deleted file mode 100644
index afa08c576..000000000
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/applicationold/HistoryService.java
+++ /dev/null
@@ -1,303 +0,0 @@
-package theodolite.uc3.applicationold;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.JoinConfig;
-import com.hazelcast.jet.Jet;
-import com.hazelcast.jet.JetInstance;
-import com.hazelcast.jet.aggregate.AggregateOperations;
-import com.hazelcast.jet.config.JobConfig;
-import com.hazelcast.jet.kafka.KafkaSinks;
-import com.hazelcast.jet.kafka.KafkaSources;
-import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
-import com.hazelcast.jet.pipeline.StreamStage;
-import com.hazelcast.jet.pipeline.WindowDefinition;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.time.*; // NOCS
-//import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import theodolite.uc3.application.uc3specifics.HourOfDayKey;
-import theodolite.uc3.application.uc3specifics.HourOfDayKeySerializer;
-import theodolite.uc3.application.uc3specifics.HoursOfDayKeyFactory;
-import theodolite.uc3.application.uc3specifics.StatsKeyFactory;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * A microservice that manages the history and, therefore, stores and aggregates incoming
- * measurements.
- */
-public class HistoryService {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
-  //private static final DateTimeFormatter TIME_FORMATTER_DEFAULT =
-  //    DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
-
-  // General Information
-  private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
-  private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
-  private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
-  private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input";
-  private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
-  private static final String KAFKA_BSERVERS_DEFAULT = "localhost:19092";
-  // UC3 specific
-  private static final String WINDOW_SIZE_IN_SECONDS_DEFAULT = "50";
-  private static final String HOPSIZE_IN_SEC_DEFAULT = "1";
-
-
-  // Information per History Service
-  private ClusterConfig clusterConfig;
-  private Properties kafkaReadPropsForPipeline;
-  private Properties kafkaWritePropsForPipeline;
-  private String kafkaInputTopic;
-  private String kafkaOutputTopic;
-  // UC3 specific
-  private int windowSizeInSeconds;
-  private int hoppingSizeInSeconds;
-
-  /**
-   * Entrypoint for UC3 using Gradle Run.
-   */
-  public static void main(final String[] args) {
-    HistoryService.loadHistoryService().run();
-  }
-
-  /** Build a history service object to run. */
-  public static HistoryService loadHistoryService() {
-    final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
-    final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
-
-    ClusterConfig clusterConfig;
-    if (bootstrapServer != null) { // NOPMD
-      clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
-      LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
-    } else if (kubernetesDnsName != null) { // NOPMD
-      clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
-      LOGGER.info("Use Kubernetes DNC name '{}'", kubernetesDnsName);
-    } else {
-      clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT);
-      LOGGER.info(// NOPMD
-          "Neitehr a bootstrap server nor a Kubernetes DNS name was provided." 
-          + "Use default bootstrap server '{}'",
-          BOOTSTRAP_SERVER_DEFAULT);
-    }
-
-    final String port = System.getenv(ConfigurationKeys.PORT);
-    if (port != null) {
-      clusterConfig.setPort(Integer.parseInt(port));
-    }
-
-    final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
-    if (portAutoIncrement != null) {
-      clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
-    }
-
-    final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
-    if (clusterNamePrefix != null) {
-      clusterConfig.setClusterNamePrefix(clusterNamePrefix);
-    }
-
-    final String kafkaBootstrapServers = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
-        KAFKA_BSERVERS_DEFAULT);
-    final String schemaRegistryUrl = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
-        SCHEMA_REGISTRY_URL_DEFAULT);
-    final Properties kafkaReadPropsForPipeline =
-        buildKafkaReadProps(kafkaBootstrapServers, schemaRegistryUrl);
-    final Properties kafkaWritePropsForPipeline =
-        buildKafkaWriteProps(kafkaBootstrapServers);
-
-    final String kafkaInputTopic = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
-        KAFKA_INPUT_TOPIC_DEFAULT);
-
-    final String kafkaOutputTopic = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
-        KAFKA_OUTPUT_TOPIC_DEFAULT);
-
-    final String windowSizeInSeconds = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.WINDOW_SIZE_IN_SECONDS),
-        WINDOW_SIZE_IN_SECONDS_DEFAULT);
-    final int windowSizeInSecondsNumber = Integer.parseInt(windowSizeInSeconds);
-
-    final String hoppingSizeInSeconds = Objects.requireNonNullElse(
-        System.getenv(ConfigurationKeys.HOPPING_SIZE_IN_SECONDS),
-        HOPSIZE_IN_SEC_DEFAULT);
-    final int hoppingSizeInSecondsNumber = Integer.parseInt(hoppingSizeInSeconds);
-
-    return new HistoryService()
-        .setClusterConfig(clusterConfig)
-        .setKafkaReadPropertiesForPipeline(kafkaReadPropsForPipeline)
-        .setKafkaWritePropertiesForPipeline(kafkaWritePropsForPipeline)
-        .setKafkaInputTopic(kafkaInputTopic)
-        .setKafkaOutputTopic(kafkaOutputTopic)
-        .setWindowSizeInSeconds(windowSizeInSecondsNumber)
-        .setHoppingSizeInSeconds(hoppingSizeInSecondsNumber);
-  }
-
-  /** Set Cluster Config when creating History Service. */
-  private HistoryService setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD
-    this.clusterConfig = clusterConfig;
-    return this;
-  }
-
-  /** Set Pipeline Kafka Read Properties. */
-  private HistoryService setKafkaReadPropertiesForPipeline(// NOPMD
-      final Properties kafkaReadPropsForPipeline) {
-    this.kafkaReadPropsForPipeline = kafkaReadPropsForPipeline;
-    return this;
-  }
-
-  /** Set Pipeline Kafka Write Properties. */
-  private HistoryService setKafkaWritePropertiesForPipeline(// NOPMD
-      final Properties kafkaWritePropsForPipeline) {
-    this.kafkaWritePropsForPipeline = kafkaWritePropsForPipeline;
-    return this;
-  }
-
-  /** Set Kafka Input topic used to build the pipeline. */
-  private HistoryService setKafkaInputTopic(final String kafkaInputTopic) { // NOPMD
-    this.kafkaInputTopic = kafkaInputTopic;
-    return this;
-  }
-
-  /** Set Kafka Output topic used to build the pipeline. */
-  private HistoryService setKafkaOutputTopic(final String kafkaOutputTopic) { // NOPMD
-    this.kafkaOutputTopic = kafkaOutputTopic;
-    return this;
-  }
-
-  /** Set the window size used in this history service (given in seconds). */
-  private HistoryService setWindowSizeInSeconds(final int windowSizeInSeconds) { // NOPMD
-    this.windowSizeInSeconds = windowSizeInSeconds;
-    return this;
-  }
-
-  /** Set the hopping size used in this history service (given in seconds). */
-  private HistoryService setHoppingSizeInSeconds(final int hoppingSizeInSeconds) { // NOPMD
-    this.hoppingSizeInSeconds = hoppingSizeInSeconds;
-    return this;
-  }
-
-  /**
-   * Defines kafka properties used to fetch data from kafka using a Hazelcast Jet pipeline.
-   *
-   * @return properties used to fetch data from kafka using a Hazelcast Jet pipeline.
-   */
-  private static Properties buildKafkaReadProps(final String kafkaBootstrapServer,
-      final String schemaRegistryUrl) {
-    final Properties props = new Properties();
-    props.put("bootstrap.servers", kafkaBootstrapServer); // NOCS
-    props.put("key.deserializer", StringDeserializer.class.getCanonicalName());
-    props.put("value.deserializer", KafkaAvroDeserializer.class);
-    props.put("specific.avro.reader", true);
-    props.put("schema.registry.url", schemaRegistryUrl);
-    props.setProperty("auto.offset.reset", "earliest");
-    return props;
-  }
-
-  /**
-   * Defines kafka properties used to write data to kafka using a Hazelcast Jet pipeline.
-   *
-   * @return properties used to fetch data from kafka using a Hazelcast Jet pipeline.
-   */
-  private static Properties buildKafkaWriteProps(final String kafkaBootstrapServer) {
-    final Properties props = new Properties();
-    props.put("bootstrap.servers", kafkaBootstrapServer); // NOCS
-    props.put("key.serializer", StringSerializer.class.getCanonicalName());
-    props.put("value.serializer", StringSerializer.class.getCanonicalName());
-    return props;
-  }
-
-  /**
-   * Start the UC3 service.
-   */
-  public void run() {
-    Objects.requireNonNull(this.clusterConfig, "No cluster config set.");
-    this.createHazelcastJetApplication();
-  }
-
-  /**
-   * Build a pipeline and start a Hazelcast Jet Instance and add a job that uses the built pipeline.
-   */
-  private void createHazelcastJetApplication() {
-   
-    // Build Pipeline for the History Service of UC3
-    final Pipeline pipeline = Pipeline.create();
-    final StreamStage<Map.Entry<String, String>> mapProduct =
-        pipeline
-            .readFrom(KafkaSources
-                .<String, ActivePowerRecord>kafka(
-                    this.kafkaReadPropsForPipeline, this.kafkaInputTopic))
-            // use Timestamps
-            .withNativeTimestamps(0)
-            // Map timestamp to hour of day and create new key using sensorID and
-            // datetime mapped to HourOfDay
-            .map(record -> {
-              String sensorId = record.getValue().getIdentifier();
-              long timestamp = record.getValue().getTimestamp();
-              LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
-                  TimeZone.getDefault().toZoneId());
-
-              final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
-              HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
-
-              return Map.entry(newKey, record.getValue());
-            })
-            // group by new keys
-            .groupingKey(newRecord -> newRecord.getKey())
-            // Sliding/Hopping Window
-            .window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(this.windowSizeInSeconds),
-                TimeUnit.SECONDS.toMillis(this.hoppingSizeInSeconds)))
-            // get average value of group (sensoreId,hourOfDay)
-            .aggregate(
-                AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
-            // map to return pair (sensorID,hourOfDay) -> (averaged what value)
-            .map(agg -> {
-              String theValue = agg.getValue().toString();
-              String theKey = agg.getKey().toString();
-              return Map.entry(theKey, theValue);
-            });    
-    // Add Sink1: Logger
-    mapProduct.writeTo(Sinks.logger());
-    // Add Sink2: Write back to kafka for the final benchmark
-    mapProduct.writeTo(KafkaSinks.<String, String>kafka(
-        this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
-
-    // Set network config for this hazelcast jet instance
-    // Create Hazelcast Config
-    final Config config = new Config().setClusterName(this.clusterConfig.getClusterNamePrefix());
-    final JoinConfig joinConfig = config.getNetworkConfig()
-        .setPort(this.clusterConfig.getPort())
-        .setPortAutoIncrement(this.clusterConfig.isPortAutoIncrement())
-        .getJoin();
-    // Set either Bootstrap Server Member or establish Kubernetes Connection
-    joinConfig.getMulticastConfig().setEnabled(false);
-    if (this.clusterConfig.hasBootstrapServer()) {
-      joinConfig.getTcpIpConfig().addMember(this.clusterConfig.getBootstrapServer());
-    } else if (this.clusterConfig.hasKubernetesDnsName()) {
-      joinConfig.getKubernetesConfig()
-        .setEnabled(true)
-        .setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, this.clusterConfig.getKubernetesDnsName());
-    }
-    
-    // Create Hazelcast Jet Instance
-    // Add config for jet instance, config for the job and add pipeline as the job
-    final JetInstance jet = Jet.newJetInstance();
-    jet.getConfig().setHazelcastConfig(config);
-    final JobConfig pipelineConfig = new JobConfig()
-        .registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class)
-        .setName("uc3-hazelcastjet");
-    jet.newJobIfAbsent(pipeline, pipelineConfig).join();
-  }
-
-
-}
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java
index 03fc362ef..fd4571f68 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java
@@ -8,10 +8,12 @@ import java.util.Properties;
 import org.slf4j.Logger;
 import theodolite.commons.hazelcastjet.ConfigurationKeys;
 import theodolite.commons.hazelcastjet.JetInstanceBuilder;
+import theodolite.uc4.application.uc4specifics.ImmutableSensorRegistryUc4Serializer;
 import theodolite.uc4.application.uc4specifics.SensorGroupKey;
 import theodolite.uc4.application.uc4specifics.SensorGroupKeySerializer;
 import theodolite.uc4.application.uc4specifics.ValueGroup;
 import theodolite.uc4.application.uc4specifics.ValueGroupSerializer;
+import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
 
 /**
  * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4
@@ -66,6 +68,8 @@ public class Uc4HazelcastJetFactory {
     final JobConfig jobConfig = new JobConfig()
         .registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
         .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
+        .registerSerializer(ImmutableSensorRegistry.class,
+            ImmutableSensorRegistryUc4Serializer.class)
         .setName(jobName);
     this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join();
   }
@@ -121,7 +125,7 @@ public class Uc4HazelcastJetFactory {
       throw new IllegalStateException("Kafka Config Read Properties for pipeline not set! "
           + defaultPipelineWarning);
     }
-    
+
     // Check if Properties for the Kafka Feedback Read are set.
     if (this.kafkaFeedbackPropsForPipeline == null) {
       throw new IllegalStateException("Kafka Feedback Read Properties for pipeline not set! "
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
index 383300041..e4561c5fc 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
@@ -95,7 +95,7 @@ public class Uc4PipelineBuilder {
           // DEBUG
           // System.out.println("D E B U G: It passed through the filter");
 
-          return Map.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()));
+          return Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()));
         });
 
     // Builds a new HashMap //
@@ -173,7 +173,7 @@ public class Uc4PipelineBuilder {
           // System.out.println("INPUT D E B U G: Got an input Stream Element!");
           // System.out.println("[SensorId=" + sensorId + "//valueinW=" + valueInW.toString());
 
-          return Map.entry(sensorId, valueInW);
+          return Util.entry(sensorId, valueInW);
         });
 
     // (1) Aggregation Stream
@@ -210,13 +210,13 @@ public class Uc4PipelineBuilder {
               if (sensorParentsCasted == null) {
                 Set<String> nullSet = new HashSet<String>();
                 nullSet.add("NULL-GROUPSET");
-                return Map.entry(sensorEvent.getKey(),
+                return Util.entry(sensorEvent.getKey(),
                     new ValueGroup(sensorEvent.getValue(), nullSet));
               } else {
                 ValueGroup valueParentsPair =
                     new ValueGroup(sensorEvent.getValue(), sensorParentsCasted);
                 // Return solution
-                return Map.entry(sensorEvent.getKey(), valueParentsPair);
+                return Util.entry(sensorEvent.getKey(), valueParentsPair);
               }
 
 
@@ -242,7 +242,7 @@ public class Uc4PipelineBuilder {
               new ArrayList<Entry<SensorGroupKey, Double>>();
           for (int i = 0; i < groupList.length; i++) {
             newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
-            newEntryList.add(Map.entry(newKeyList[i], valueInW));
+            newEntryList.add(Util.entry(newKeyList[i], valueInW));
             // DEBUG
             // System.out.println("Added new Entry to list: [(" + newKeyList[i].getSensorId() + ","
             // + newKeyList[i].getGroup() + ")," + valueInW.toString());
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ImmutableSensorRegistryUc4Serializer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ImmutableSensorRegistryUc4Serializer.java
new file mode 100644
index 000000000..1e8e4f760
--- /dev/null
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ImmutableSensorRegistryUc4Serializer.java
@@ -0,0 +1,36 @@
+package theodolite.uc4.application.uc4specifics;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import java.io.IOException;
+import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
+
+/**
+ * StreamSerializer for Hazelcast Jet to serialize and deserialize an ImmutableSensorRegistry.
+ */
+public class ImmutableSensorRegistryUc4Serializer
+    implements StreamSerializer<ImmutableSensorRegistry> {
+
+  private static final int TYPE_ID = 3;
+
+  @Override
+  public int getTypeId() {
+    // TODO Auto-generated method stub
+    return TYPE_ID;
+  }
+
+  @Override
+  public void write(final ObjectDataOutput out, final ImmutableSensorRegistry object)
+      throws IOException {
+    final String sensorRegistryJson = object.toJson();
+    out.writeString(sensorRegistryJson);
+  }
+
+  @Override
+  public ImmutableSensorRegistry read(final ObjectDataInput in) throws IOException {
+    final String sensorRegistryJson = in.readString();
+    return (ImmutableSensorRegistry) ImmutableSensorRegistry.fromJson(sensorRegistryJson);
+  }
+
+}
-- 
GitLab