From 56b0b2ef8d87c99c491348c1825b752045e5b167 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 8 Mar 2021 14:28:14 +0100
Subject: [PATCH] Migrate UC4 (fka UC2) for Flink

Co-authored-by: Nico Biernat <stu209212@mail.uni-kiel.de>
---
 theodolite-benchmarks/settings.gradle         |   2 +-
 .../.settings/org.eclipse.jdt.ui.prefs        |   3 +
 .../qa.eclipse.plugin.checkstyle.prefs        |   4 +
 .../.settings/qa.eclipse.plugin.pmd.prefs     |   4 +
 .../uc4-application-flink/Dockerfile          |   3 +
 .../uc4-application-flink/build.gradle        |   1 +
 .../AggregationServiceFlinkJob.java           | 329 ++++++++++++++++++
 .../ChildParentsFlatMapFunction.java          | 101 ++++++
 .../uc4/application/ConfigurationKeys.java    |  37 ++
 .../JoinAndDuplicateCoFlatMapFunction.java    |  57 +++
 ...uplicateKeyedBroadcastProcessFunction.java |  53 +++
 ...ecordAggregationProcessWindowFunction.java |  96 +++++
 .../ImmutableSensorRegistrySerializer.java    |  24 ++
 .../util/ImmutableSetSerializer.java          |  47 +++
 .../uc4/application/util/SensorParentKey.java |  46 +++
 .../util/SensorParentKeySerializer.java       |  27 ++
 .../resources/META-INF/application.properties |  15 +
 17 files changed, 848 insertions(+), 1 deletion(-)
 create mode 100644 theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs
 create mode 100644 theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
 create mode 100644 theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
 create mode 100644 theodolite-benchmarks/uc4-application-flink/Dockerfile
 create mode 100644 theodolite-benchmarks/uc4-application-flink/build.gradle
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java
 create mode 100644 theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties

diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index ad1cb4f4c..6d499616f 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -18,4 +18,4 @@ include 'uc3-application-flink' // TODO Rename to uc3-flink
 
 include 'uc4-workload-generator' // TODO Rename to uc4-load-generator
 include 'uc4-application' // TODO Rename to uc4-kstreams
-//include 'uc4-application-flink' // TODO Rename to uc4-flink
+include 'uc4-application-flink' // TODO Rename to uc4-flink
diff --git a/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 000000000..c8291661b
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,3 @@
+cleanup_settings_version=2
+eclipse.preferences.version=1
+formatter_settings_version=15
diff --git a/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 000000000..87860c815
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 000000000..efbcb8c9e
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/uc4-application-flink/Dockerfile b/theodolite-benchmarks/uc4-application-flink/Dockerfile
new file mode 100644
index 000000000..59ec78d72
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/Dockerfile
@@ -0,0 +1,3 @@
+FROM nicobiernat/flink:1.11-scala_2.12-java_11
+
+ADD build/libs/uc4-application-all.jar /opt/flink/usrlib/artifacts/uc4-application-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-application-flink/build.gradle b/theodolite-benchmarks/uc4-application-flink/build.gradle
new file mode 100644
index 000000000..0ad804c62
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/build.gradle
@@ -0,0 +1 @@
+mainClassName = "theodolite.uc4.application.AggregationServiceFlinkJob"
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
new file mode 100644
index 000000000..e56e67573
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -0,0 +1,329 @@
+package theodolite.uc4.application;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
+import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer;
+import theodolite.uc4.application.util.ImmutableSetSerializer;
+import theodolite.uc4.application.util.SensorParentKey;
+import theodolite.uc4.application.util.SensorParentKeySerializer;
+import titan.ccp.common.configuration.Configurations;
+import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.configuration.events.EventSerde;
+import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
+import titan.ccp.model.sensorregistry.SensorRegistry;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+import titan.ccp.models.records.AggregatedActivePowerRecord;
+import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * The Aggregation Microservice Flink Job.
+ */
+public class AggregationServiceFlinkJob {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
+
+  private final Configuration config = Configurations.create();
+
+  private void run() {
+    // Configurations
+    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    final String applicationId = applicationName + "-" + applicationVersion;
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final Time windowSize = Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
+    final Duration windowGrace = Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
+    final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
+    final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
+    final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
+    final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+    final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+
+    final Properties kafkaProps = new Properties();
+    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
+    kafkaProps.setProperty("group.id", applicationId);
+
+    // Sources and Sinks with Serializer and Deserializer
+
+    // Source from input topic with ActivePowerRecords
+    final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> inputSerde =
+        new FlinkMonitoringRecordSerde<>(inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class);
+
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>(
+        inputTopic, inputSerde, kafkaProps);
+
+    kafkaInputSource.setStartFromGroupOffsets();
+    if (checkpointing) {
+      kafkaInputSource.setCommitOffsetsOnCheckpoints(true);
+    }
+
+    // Source from output topic with AggregatedPowerRecords
+    final FlinkMonitoringRecordSerde<AggregatedActivePowerRecord, AggregatedActivePowerRecordFactory> outputSerde =
+        new FlinkMonitoringRecordSerde<>(inputTopic,
+            AggregatedActivePowerRecord.class,
+            AggregatedActivePowerRecordFactory.class);
+
+    final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = new FlinkKafkaConsumer<>(
+        outputTopic, outputSerde, kafkaProps);
+
+    kafkaOutputSource.setStartFromGroupOffsets();
+    if (checkpointing) {
+      kafkaOutputSource.setCommitOffsetsOnCheckpoints(true);
+    }
+
+    // Source from configuration topic with EventSensorRegistry JSON
+    final FlinkKafkaKeyValueSerde<Event, String> configSerde =
+        new FlinkKafkaKeyValueSerde<>(
+            configurationTopic,
+            EventSerde::serde,
+            Serdes::String,
+            TypeInformation.of(new TypeHint<Tuple2<Event, String>>() {
+            }));
+
+    final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = new FlinkKafkaConsumer<>(
+        configurationTopic, configSerde, kafkaProps);
+    kafkaConfigSource.setStartFromGroupOffsets();
+    if (checkpointing) {
+      kafkaConfigSource.setCommitOffsetsOnCheckpoints(true);
+    }
+
+    // Sink to output topic with SensorId, AggregatedActivePowerRecord
+    FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde =
+        new FlinkKafkaKeyValueSerde<>(
+            outputTopic,
+            Serdes::String,
+            () -> IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()),
+            TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {
+            }));
+
+    final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
+        new FlinkKafkaProducer<>(
+            outputTopic,
+            aggregationSerde,
+            kafkaProps,
+            FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
+    kafkaAggregationSink.setWriteTimestampToKafka(true);
+
+    // Execution environment configuration
+//    org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
+//    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+//    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    if (checkpointing) {
+      env.enableCheckpointing(commitIntervalMs);
+    }
+
+    // State Backend
+    if (stateBackend.equals("filesystem")) {
+      env.setStateBackend(new FsStateBackend(stateBackendPath));
+    } else if (stateBackend.equals("rocksdb")) {
+      try {
+        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    } else {
+      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
+    }
+
+    // Kryo serializer registration
+    env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class,
+        new FlinkMonitoringRecordSerde<>(
+            inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class));
+
+    env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer());
+    env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, new SensorParentKeySerializer());
+
+    env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(), new ImmutableSetSerializer());
+    env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), new ImmutableSetSerializer());
+    env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), new ImmutableSetSerializer());
+
+    env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) ->
+        LOGGER.info("Class " + c.getName() + " registered with serializer "
+            + s.getSerializer().getClass().getName()));
+
+    // Streaming topology
+
+    // Build input stream
+    final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic)
+        .rebalance()
+        .map(r -> r)
+        .name("[Map] Rebalance Forward");
+
+    // Build aggregation stream
+    final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource)
+        .name("[Kafka Consumer] Topic: " + outputTopic)
+        .rebalance()
+        .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
+        .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
+
+    // Merge input and aggregation streams
+    final DataStream<ActivePowerRecord> mergedInputStream = inputStream
+        .union(aggregationsInputStream);
+
+    if (debug) {
+      mergedInputStream
+          .map(new MapFunction<ActivePowerRecord, String>() {
+            @Override
+            public String map(ActivePowerRecord value) throws Exception {
+              return
+                  "ActivePowerRecord { "
+                      + "identifier: " + value.getIdentifier() + ", "
+                      + "timestamp: " + value.getTimestamp() + ", "
+                      + "valueInW: " + value.getValueInW() + " }";
+            }
+          })
+          .name("[Map] toString")
+          .print();
+    }
+    // Build parent sensor stream from configuration stream
+    final DataStream<Tuple2<String, Set<String>>> configurationsStream =
+        env.addSource(kafkaConfigSource)
+            .name("[Kafka Consumer] Topic: " + configurationTopic)
+            .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
+                || tuple.f0 == Event.SENSOR_REGISTRY_STATUS).name("[Filter] SensorRegistry changed")
+            .map(new MapFunction<Tuple2<Event, String>, SensorRegistry>() {
+              @Override
+              public SensorRegistry map(Tuple2<Event, String> tuple) {
+//                Gson gson = new GsonBuilder().setPrettyPrinting().create();
+//                String prettyJsonString = gson.toJson(new JsonParser().parse(tuple.f1));
+//                LOGGER.info("SensorRegistry: " + prettyJsonString);
+                return SensorRegistry.fromJson(tuple.f1);
+              }
+            }).name("[Map] JSON -> SensorRegistry")
+            .keyBy(sr -> 1)
+            .flatMap(new ChildParentsFlatMapFunction())
+            .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])");
+
+    DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream =
+        mergedInputStream.connect(configurationsStream)
+            .keyBy(ActivePowerRecord::getIdentifier,
+                ((KeySelector<Tuple2<String, Set<String>>, String>) t -> (String) t.f0))
+            .flatMap(new JoinAndDuplicateCoFlatMapFunction())
+            .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
+
+//    KeyedStream<ActivePowerRecord, String> keyedStream =
+//        mergedInputStream.keyBy(ActivePowerRecord::getIdentifier);
+//
+//    MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
+//        new MapStateDescriptor<>(
+//            "join-and-duplicate-state",
+//            BasicTypeInfo.STRING_TYPE_INFO,
+//            TypeInformation.of(new TypeHint<Set<String>>() {}));
+//
+//    BroadcastStream<Tuple2<String, Set<String>>> broadcastStream =
+//        configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor);
+//
+//    DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream =
+//        keyedStream.connect(broadcastStream)
+//        .process(new JoinAndDuplicateKeyedBroadcastProcessFunction());
+
+    if (debug) {
+      lastValueStream
+          .map(new MapFunction<Tuple2<SensorParentKey, ActivePowerRecord>, String>() {
+            @Override
+            public String map(Tuple2<SensorParentKey, ActivePowerRecord> t) throws Exception {
+              return "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {"
+                  + "identifier: " + t.f1.getIdentifier() + ", "
+                  + "timestamp: " + t.f1.getTimestamp() + ", "
+                  + "valueInW: " + t.f1.getValueInW() + " }";
+            }
+          })
+          .print();
+    }
+
+    DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream
+        .rebalance()
+        .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
+        .keyBy(t -> t.f0.getParent())
+        .window(TumblingEventTimeWindows.of(windowSize))
+        .process(new RecordAggregationProcessWindowFunction())
+        .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord");
+
+    // add Kafka Sink
+    aggregationStream
+        .map(new MapFunction<AggregatedActivePowerRecord, Tuple2<String, AggregatedActivePowerRecord>>() {
+          @Override
+          public Tuple2<String, AggregatedActivePowerRecord> map(AggregatedActivePowerRecord value) throws Exception {
+            return new Tuple2<>(value.getIdentifier(), value);
+          }
+        })
+        .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
+        .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
+
+    // add stdout sink
+    if (debug) {
+        aggregationStream
+            .map(new MapFunction<AggregatedActivePowerRecord, String>() {
+              @Override
+              public String map(AggregatedActivePowerRecord value) throws Exception {
+                return
+                    "AggregatedActivePowerRecord { "
+                        + "identifier: " + value.getIdentifier() + ", "
+                        + "timestamp: " + value.getTimestamp() + ", "
+                        + "minInW: " + value.getMinInW() + ", "
+                        + "maxInW: " + value.getMaxInW() + ", "
+                        + "count: " + value.getCount() + ", "
+                        + "sumInW: " + value.getSumInW() + ", "
+                        + "avgInW: " + value.getAverageInW() + " }";
+              }
+            })
+            .name("[Map] toString")
+            .print();
+    }
+    // Execution plan
+
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info("Execution Plan:\n" + env.getExecutionPlan());
+    }
+
+    // Execute Job
+
+    try {
+      env.execute(applicationId);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(final String[] args) {
+    new AggregationServiceFlinkJob().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
new file mode 100644
index 000000000..8cfbaddc9
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
@@ -0,0 +1,101 @@
+package theodolite.uc4.application;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import titan.ccp.model.sensorregistry.AggregatedSensor;
+import titan.ccp.model.sensorregistry.Sensor;
+import titan.ccp.model.sensorregistry.SensorRegistry;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Transforms a {@link SensorRegistry} into key value pairs of Sensor identifiers and their parents'
+ * sensor identifiers. All pairs whose sensor's parents have changed since last iteration are
+ * forwarded. A mapping of an identifier to <code>null</code> means that the corresponding sensor
+ * does not longer exists in the sensor registry.
+ */
+public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> {
+
+  private static final long serialVersionUID = 3969444219510915221L;
+
+  private transient MapState<String, Set<String>> state;
+
+  @Override
+  public void open(Configuration parameters) {
+    MapStateDescriptor<String, Set<String>> descriptor =
+        new MapStateDescriptor<String, Set<String>>(
+            "child-parents-state",
+            TypeInformation.of(new TypeHint<String>(){}),
+            TypeInformation.of(new TypeHint<Set<String>>(){}));
+    this.state = getRuntimeContext().getMapState(descriptor);
+  }
+
+  @Override
+  public void flatMap(SensorRegistry value, Collector<Tuple2<String, Set<String>>> out)
+      throws Exception {
+    final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(value);
+    this.updateChildParentsPairs(childParentsPairs);
+    this.updateState(childParentsPairs);
+    childParentsPairs
+        .entrySet()
+        .stream()
+        .map(e -> new Tuple2<>(e.getKey(), e.getValue()))
+        .forEach(out::collect);
+  }
+
+  private Map<String, Set<String>> constructChildParentsPairs(final SensorRegistry registry) {
+    return this.streamAllChildren(registry.getTopLevelSensor())
+        .collect(Collectors.toMap(
+            Sensor::getIdentifier,
+            child -> child.getParent()
+                .map(p -> Set.of(p.getIdentifier()))
+                .orElseGet(Set::of)));
+  }
+
+  private Stream<Sensor> streamAllChildren(final AggregatedSensor sensor) {
+    return sensor.getChildren().stream()
+        .flatMap(s -> Stream.concat(
+            Stream.of(s),
+            s instanceof AggregatedSensor ? this.streamAllChildren((AggregatedSensor) s)
+                : Stream.empty()));
+  }
+
+  private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs)
+      throws Exception {
+    final Iterator<Map.Entry<String, Set<String>>> oldChildParentsPairs = this.state.iterator();
+    while (oldChildParentsPairs.hasNext()) {
+      final Map.Entry<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next();
+      final String identifier = oldChildParentPair.getKey();
+      final Set<String> oldParents = oldChildParentPair.getValue();
+      final Set<String> newParents = childParentsPairs.get(identifier); // null if not exists
+      if (newParents == null) {
+        // Sensor was deleted
+        childParentsPairs.put(identifier, null);
+      } else if (newParents.equals(oldParents)) {
+        // No changes
+        childParentsPairs.remove(identifier);
+      }
+      // Else: Later Perhaps: Mark changed parents
+    }
+  }
+
+  private void updateState(final Map<String, Set<String>> childParentsPairs) throws Exception {
+    for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) {
+      if (childParentPair.getValue() == null) {
+        this.state.remove(childParentPair.getKey());
+      } else {
+        this.state.put(childParentPair.getKey(), childParentPair.getValue());
+      }
+    }
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
new file mode 100644
index 000000000..a5be34aeb
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
@@ -0,0 +1,37 @@
+package theodolite.uc4.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+  public static final String APPLICATION_NAME = "application.name";
+
+  public static final String APPLICATION_VERSION = "application.version";
+
+  public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  public static final String WINDOW_SIZE_MS = "window.size.ms";
+
+  public static final String WINDOW_GRACE_MS = "window.grace.ms";
+
+  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+  public static final String FLINK_STATE_BACKEND = "flink.state.backend";
+
+  public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
+
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size";
+
+  public static final String DEBUG = "debug";
+
+  public static final String CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
new file mode 100644
index 000000000..bd335132b
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
@@ -0,0 +1,57 @@
+package theodolite.uc4.application;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import theodolite.uc4.application.util.SensorParentKey;
+import titan.ccp.models.records.ActivePowerRecord;
+
+import java.util.Set;
+
+public class JoinAndDuplicateCoFlatMapFunction extends RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> {
+
+  private static final long serialVersionUID = -6992783644887835979L;
+
+  private transient MapState<String, Set<String>> state;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    MapStateDescriptor<String, Set<String>> descriptor =
+        new MapStateDescriptor<String, Set<String>>(
+            "join-and-duplicate-state",
+            TypeInformation.of(new TypeHint<String>(){}),
+            TypeInformation.of(new TypeHint<Set<String>>(){}));
+    this.state = getRuntimeContext().getMapState(descriptor);
+  }
+
+  @Override
+  public void flatMap1(ActivePowerRecord value, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
+    Set<String> parents = this.state.get(value.getIdentifier());
+    if (parents == null) return;
+    for (String parent : parents) {
+      out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value));
+    }
+  }
+
+  @Override
+  public void flatMap2(Tuple2<String, Set<String>> value, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
+    Set<String> oldParents = this.state.get(value.f0);
+    if (oldParents != null) {
+      Set<String> newParents = value.f1;
+      if (!newParents.equals(oldParents)) {
+        for (String oldParent : oldParents) {
+          if (!newParents.contains(oldParent)) {
+            out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
+          }
+        }
+      }
+    }
+    this.state.put(value.f0, value.f1);
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java
new file mode 100644
index 000000000..ad24310b0
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java
@@ -0,0 +1,53 @@
+package theodolite.uc4.application;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import theodolite.uc4.application.util.SensorParentKey;
+import titan.ccp.models.records.ActivePowerRecord;
+
+import java.util.Set;
+
+public class JoinAndDuplicateKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> {
+
+  private static final long serialVersionUID = -4525438547262992821L;
+
+  private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
+      new MapStateDescriptor<>(
+          "join-and-duplicate-state",
+          BasicTypeInfo.STRING_TYPE_INFO,
+          TypeInformation.of(new TypeHint<Set<String>>() {}));
+
+  @Override
+  public void processElement(ActivePowerRecord value, ReadOnlyContext ctx, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
+    Set<String> parents = ctx.getBroadcastState(sensorConfigStateDescriptor).get(value.getIdentifier());
+    if (parents == null) return;
+    for (String parent : parents) {
+      out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value));
+    }
+  }
+
+  @Override
+  public void processBroadcastElement(Tuple2<String, Set<String>> value, Context ctx, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
+    BroadcastState<String, Set<String>> state = ctx.getBroadcastState(sensorConfigStateDescriptor);
+    Set<String> oldParents = state.get(value.f0);
+    if (oldParents != null) {
+      Set<String> newParents = value.f1;
+      if (!newParents.equals(oldParents)) {
+        for (String oldParent : oldParents) {
+          if (!newParents.contains(oldParent)) {
+            out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
+          }
+        }
+      }
+    }
+    state.put(value.f0, value.f1);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
new file mode 100644
index 000000000..db0a47721
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
@@ -0,0 +1,96 @@
+package theodolite.uc4.application;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import theodolite.uc4.application.util.SensorParentKey;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.AggregatedActivePowerRecord;
+
+public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> {
+
+  private static final long serialVersionUID = 6030159552332624435L;
+
+  private transient MapState<SensorParentKey, ActivePowerRecord> lastValueState;
+  private transient ValueState<AggregatedActivePowerRecord> aggregateState;
+
+  @Override
+  public void open(org.apache.flink.configuration.Configuration parameters) {
+    final MapStateDescriptor<SensorParentKey, ActivePowerRecord> lastValueStateDescriptor =
+        new MapStateDescriptor<SensorParentKey, ActivePowerRecord>(
+            "last-value-state",
+            TypeInformation.of(new TypeHint<SensorParentKey>() {
+            }),
+            TypeInformation.of(new TypeHint<ActivePowerRecord>() {
+            }));
+    this.lastValueState = getRuntimeContext().getMapState(lastValueStateDescriptor);
+
+    final ValueStateDescriptor<AggregatedActivePowerRecord> aggregateStateDescriptor =
+        new ValueStateDescriptor<AggregatedActivePowerRecord>(
+            "aggregation-state",
+            TypeInformation.of(new TypeHint<AggregatedActivePowerRecord>() {
+            }));
+    this.aggregateState = getRuntimeContext().getState(aggregateStateDescriptor);
+  }
+
+  @Override
+  public void process(String key, Context context, Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements, Collector<AggregatedActivePowerRecord> out) throws Exception {
+    for (Tuple2<SensorParentKey, ActivePowerRecord> t : elements) {
+      AggregatedActivePowerRecord currentAggregate = this.aggregateState.value();
+      if (currentAggregate == null) {
+        currentAggregate = new AggregatedActivePowerRecord(key, 0, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, 0, 0, 0);
+        this.aggregateState.update(currentAggregate);
+      }
+      long count = currentAggregate.getCount();
+
+      final SensorParentKey sensorParentKey = t.f0;
+      ActivePowerRecord newRecord = t.f1;
+      if (newRecord == null) { // sensor was deleted -> decrease count, set newRecord to zero
+        count--;
+        newRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0);
+      }
+
+      // get last value of this record from state or create 0 valued record
+      ActivePowerRecord previousRecord = this.lastValueState.get(sensorParentKey);
+      if (previousRecord == null) { // sensor was added -> increase count
+        count++;
+        previousRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0);
+      }
+
+      // if incoming record is older than the last saved record, skip the record
+      if (newRecord.getTimestamp() < previousRecord.getTimestamp()) {
+        continue;
+      }
+
+      // prefer newer timestamp, but use previous if 0 -> sensor was deleted
+      long timestamp = newRecord.getTimestamp() == 0 ? previousRecord.getTimestamp() : newRecord.getTimestamp();
+      double sumInW = currentAggregate.getSumInW() - previousRecord.getValueInW() + newRecord.getValueInW();
+      double avgInW = count == 0 ? 0 : sumInW / count;
+
+      AggregatedActivePowerRecord newAggregate = new AggregatedActivePowerRecord(
+          sensorParentKey.getParent(),
+          timestamp,
+          Math.min(currentAggregate.getMinInW(), newRecord.getValueInW()),
+          Math.max(currentAggregate.getMaxInW(), newRecord.getValueInW()),
+          count,
+          sumInW,
+          avgInW
+      );
+
+      // update state and aggregateState
+      this.lastValueState.put(sensorParentKey, newRecord);
+      this.aggregateState.update(newAggregate);
+    }
+
+    // emit aggregated record
+    out.collect(this.aggregateState.value());
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
new file mode 100644
index 000000000..0de9a500c
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
@@ -0,0 +1,24 @@
+package theodolite.uc4.application.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
+
+import java.io.Serializable;
+
+public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry> implements Serializable {
+
+  private static final long serialVersionUID = 1806411056006113017L;
+
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableSensorRegistry object) {
+    output.writeString(object.toJson());
+  }
+
+  @Override
+  public ImmutableSensorRegistry read(Kryo kryo, Input input, Class<ImmutableSensorRegistry> type) {
+    return (ImmutableSensorRegistry) ImmutableSensorRegistry.fromJson(input.readString());
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
new file mode 100644
index 000000000..c3117051f
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
@@ -0,0 +1,47 @@
+package theodolite.uc4.application.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable {
+
+  public ImmutableSetSerializer() {
+    super(false, true);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, Set<Object> object) {
+    output.writeInt(object.size(), true);
+    for (final Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
+    }
+  }
+
+  @Override
+  public Set<Object> read(Kryo kryo, Input input, Class<Set<Object>> type) {
+    final int size = input.readInt(true);
+    final Object[] list = new Object[size];
+    for (int i = 0; i < size; ++i) {
+      list[i] = kryo.readClassAndObject(input);
+    }
+    return Set.of(list);
+  }
+
+  /**
+   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+   * for the several related classes
+   *
+   * @param kryo the {@link Kryo} instance to set the serializer on
+   */
+  public static void registerSerializers(Kryo kryo) {
+    final ImmutableSetSerializer serializer = new ImmutableSetSerializer();
+    kryo.register(Set.of().getClass(), serializer);
+    kryo.register(Set.of(1).getClass(), serializer);
+    kryo.register(Set.of(1, 2, 3, 4).getClass(), serializer);
+  }
+}
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java
new file mode 100644
index 000000000..657918a1c
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java
@@ -0,0 +1,46 @@
+package theodolite.uc4.application.util;
+
+import java.util.Objects;
+
+/**
+ * A key consisting of the identifier of a sensor and an identifier of parent sensor.
+ */
+public class SensorParentKey {
+
+  private final String sensorIdentifier;
+
+  private final String parentIdentifier;
+
+  public SensorParentKey(final String sensorIdentifier, final String parentIdentifier) {
+    this.sensorIdentifier = sensorIdentifier;
+    this.parentIdentifier = parentIdentifier;
+  }
+
+  public String getSensor() {
+    return this.sensorIdentifier;
+  }
+
+  public String getParent() {
+    return this.parentIdentifier;
+  }
+
+  @Override
+  public String toString() {
+    return "{" + this.sensorIdentifier + ", " + this.parentIdentifier + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sensorIdentifier, parentIdentifier);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) return true;
+    if (!(obj instanceof SensorParentKey)) return false;
+    final SensorParentKey k = (SensorParentKey) obj;
+    return sensorIdentifier.equals(k.sensorIdentifier) && parentIdentifier.equals(k.parentIdentifier);
+  }
+
+
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java
new file mode 100644
index 000000000..ad63f9c3b
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java
@@ -0,0 +1,27 @@
+package theodolite.uc4.application.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.Serializable;
+
+/**
+ * Kryo serializer for {@link SensorParentKey}.
+ */
+public final class SensorParentKeySerializer extends Serializer<SensorParentKey> implements Serializable {
+
+  @Override
+  public void write(Kryo kryo, Output output, SensorParentKey object) {
+    output.writeString(object.getSensor());
+    output.writeString(object.getParent());
+  }
+
+  @Override
+  public SensorParentKey read(Kryo kryo, Input input, Class<SensorParentKey> type) {
+    final String sensor = input.readString();
+    final String parent = input.readString();
+    return new SensorParentKey(sensor, parent);
+  }
+}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..571a090c3
--- /dev/null
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,15 @@
+application.name=theodolite-uc2-application
+application.version=0.0.1
+
+configuration.host=localhost
+configuration.port=8082
+configuration.kafka.topic=configuration
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+window.size.ms=1000
+window.grace.ms=0
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
-- 
GitLab