From c7f4cbcf9bb657d251aad1dca627709075107138 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Sat, 28 Nov 2020 15:33:36 +0100
Subject: [PATCH] Configure Kafka Streams configurations in common class and
 add more

Configure the Kafka Streams configuration in a common class and not
add them in every UC application. This enhances maintainability of
common shared configurations.
Further enable more configurations to set for the Kafka Streams
application.
---
 .../kafkastreams/ConfigurationKeys.java       |   6 -
 .../kafkastreams/KafkaStreamsBuilder.java     | 176 +++++++-----------
 build.gradle                                  |   1 +
 .../uc1/application/HistoryService.java       |  16 +-
 .../Uc1KafkaStreamsBuilder.java               |   7 +-
 .../resources/META-INF/application.properties |   4 -
 .../uc2/application/AggregationService.java   |  15 +-
 .../Uc2KafkaStreamsBuilder.java               |   7 +-
 .../resources/META-INF/application.properties |   6 +-
 .../uc3/application/ConfigurationKeys.java    |  29 ---
 .../uc3/application/HistoryService.java       |  16 +-
 .../Uc3KafkaStreamsBuilder.java               |   7 +-
 .../resources/META-INF/application.properties |   4 -
 .../uc4/application/HistoryService.java       |  13 +-
 .../Uc4KafkaStreamsBuilder.java               |   7 +-
 .../resources/META-INF/application.properties |   4 -
 16 files changed, 94 insertions(+), 224 deletions(-)
 delete mode 100644 uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java

diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
index 6302e4c69..ca1838b84 100644
--- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
+++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
@@ -9,12 +9,6 @@ public final class ConfigurationKeys {
 
   public static final String APPLICATION_VERSION = "application.version";
 
-  public static final String NUM_THREADS = "num.threads";
-
-  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
-
-  public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
-
   public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
 
   public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
index 8c758c244..0bddef992 100644
--- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
+++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java
@@ -1,7 +1,9 @@
 package theodolite.commons.kafkastreams;
 
-import java.util.Objects;
 import java.util.Properties;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
@@ -12,109 +14,92 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder;
  */
 public abstract class KafkaStreamsBuilder {
 
-  // Kafkastreams application specific
-  protected String schemaRegistryUrl; // NOPMD for use in subclass
+  // Kafka Streams application specific
+  protected final String schemaRegistryUrl; // NOPMD for use in subclass
+  protected final String inputTopic; // NOPMD for use in subclass
 
-  private String applicationName; // NOPMD
-  private String applicationVersion; // NOPMD
-  private String bootstrapServers; // NOPMD
-  private int numThreads = -1; // NOPMD
-  private int commitIntervalMs = -1; // NOPMD
-  private int cacheMaxBytesBuff = -1; // NOPMD
+  private final Configuration config;
 
-  /**
-   * Sets the application name for the {@code KafkaStreams} application. It is used to create the
-   * application ID.
-   *
-   * @param applicationName Name of the application.
-   * @return
-   */
-  public KafkaStreamsBuilder applicationName(final String applicationName) {
-    this.applicationName = applicationName;
-    return this;
-  }
-
-  /**
-   * Sets the application version for the {@code KafkaStreams} application. It is used to create the
-   * application ID.
-   *
-   * @param applicationVersion Version of the application.
-   * @return
-   */
-  public KafkaStreamsBuilder applicationVersion(final String applicationVersion) {
-    this.applicationVersion = applicationVersion;
-    return this;
-  }
+  private final String applicationName; // NOPMD
+  private final String applicationVersion; // NOPMD
+  private final String bootstrapServers; // NOPMD
 
   /**
-   * Sets the bootstrap servers for the {@code KafkaStreams} application.
+   * Construct a new Build object for a Kafka Streams application.
    *
-   * @param bootstrapServers String for a bootstrap server.
-   * @return
+   * @param config Contains the key value pairs for configuration.
    */
-  public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
-    this.bootstrapServers = bootstrapServers;
-    return this;
+  public KafkaStreamsBuilder(final Configuration config) {
+    this.config = config;
+    this.applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    this.applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    this.bootstrapServers = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
+    this.inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
   }
 
   /**
-   * Sets the URL for the schema registry.
+   * Checks if the given key is contained in the configurations and sets it in the properties.
    *
-   * @param url The URL of the schema registry.
-   * @return
+   * @param <T> Type of the value for given key
+   * @param propBuilder Object where to set this property.
+   * @param key The key to check and set the property.
+   * @param valueGetter Method to get the value from with given key.
+   * @param condition for setting the property.
    */
-  public KafkaStreamsBuilder schemaRegistry(final String url) {
-    this.schemaRegistryUrl = url;
-    return this;
-  }
-
-  /**
-   * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
-   * one for using the default.
-   *
-   * @param numThreads Number of threads. -1 for using the default.
-   * @return
-   */
-  public KafkaStreamsBuilder numThreads(final int numThreads) {
-    if (numThreads < -1 || numThreads == 0) {
-      throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
+  private <T> void setOptionalProperty(final PropertiesBuilder propBuilder,
+      final String key,
+      final Function<String, T> valueGetter,
+      final Predicate<T> condition) {
+    if (this.config.containsKey(key)) {
+      final T value = valueGetter.apply(key);
+      propBuilder.set(key, value, condition);
     }
-    this.numThreads = numThreads;
-    return this;
   }
 
   /**
-   * Sets the Kafka Streams property for the frequency with which to save the position (offsets in
-   * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
-   * example, when processing bulks of records. Can be minus one for using the default.
+   * Build the {@link Properties} for a {@code KafkaStreams} application.
    *
-   * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using
-   *        the default.
-   * @return
+   * @return A {@code Properties} object.
    */
-  public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
-    if (commitIntervalMs < -1) {
-      throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
+  protected Properties buildProperties() {
+    // required configuration
+    final PropertiesBuilder propBuilder = PropertiesBuilder
+        .bootstrapServers(this.bootstrapServers)
+        .applicationId(this.applicationName + '-' + this.applicationVersion);
+
+    // optional configurations
+    this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
+        this.config::getLong,
+        p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+        this.config::getInt, p -> p > 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
+        this.config::getInt,
+        p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
+        this.config::getInt, p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
+        this.config::getLong,
+        p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
+        this.config::getInt, p -> p >= 1);
+    this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
+        this.config::getInt, p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG,
+        this.config::getInt, p -> p > 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG,
+        this.config::getLong,
+        p -> p >= 0);
+    this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG,
+        this.config::getInt, p -> p >= 0);
+
+    if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION)
+        && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) {
+      propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
     }
-    this.commitIntervalMs = commitIntervalMs;
-    return this;
-  }
 
-  /**
-   * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
-   * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
-   * example, when processing bulks of records. Can be minus one for using the default.
-   *
-   * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all
-   *        threads. -1 for using the default.
-   * @return
-   */
-  public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
-    if (cacheMaxBytesBuffering < -1) {
-      throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
-    }
-    this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
-    return this;
+    return propBuilder.build();
   }
 
   /**
@@ -124,31 +109,10 @@ public abstract class KafkaStreamsBuilder {
    */
   protected abstract Topology buildTopology();
 
-  /**
-   * Build the {@link Properties} for a {@code KafkaStreams} application.
-   *
-   * @return A {@code Properties} object.
-   */
-  protected Properties buildProperties() {
-    return PropertiesBuilder
-        .bootstrapServers(this.bootstrapServers)
-        .applicationId(this.applicationName + '-' + this.applicationVersion)
-        .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
-        .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
-        .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
-        .build();
-  }
-
   /**
    * Builds the {@link KafkaStreams} instance.
    */
   public KafkaStreams build() {
-    // Check for required attributes for building properties.
-    Objects.requireNonNull(this.applicationName, "Application name has not been set.");
-    Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
-    Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
-    Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set.");
-
     // Create the Kafka streams instance.
     return new KafkaStreams(this.buildTopology(), this.buildProperties());
   }
diff --git a/build.gradle b/build.gradle
index e0c733b24..3cb86b68e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,7 @@ configure(commonProjects) {
       implementation 'org.slf4j:slf4j-simple:1.7.25'
       implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
       implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
+      implementation 'org.apache.kafka:kafka-streams:2.6.0'
 
       // Use JUnit test framework
       testImplementation 'junit:junit:4.12'
diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java
index a35cc37b3..f0d8062a2 100644
--- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java
+++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java
@@ -3,7 +3,6 @@ package theodolite.uc1.application;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.KafkaStreams;
-import theodolite.commons.kafkastreams.ConfigurationKeys;
 import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
 import titan.ccp.common.configuration.ServiceConfigurations;
 
@@ -31,18 +30,9 @@ public class HistoryService {
    */
   private void createKafkaStreamsApplication() {
 
-    final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder();
-    uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC));
-
-    final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder
-        .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
-        .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
-        .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
-        .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
-        .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
-        .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
-        .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
-        .build();
+    final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(this.config);
+
+    final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder.build();
 
     this.stopEvent.thenRun(kafkaStreams::close);
 
diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
index 7699ecb48..143352828 100644
--- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
+++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java
@@ -1,6 +1,7 @@
 package theodolite.uc1.streamprocessing;
 
 import java.util.Objects;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
@@ -9,11 +10,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  * Builder for the Kafka Streams configuration.
  */
 public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
-  private String inputTopic; // NOPMD
 
-  public KafkaStreamsBuilder inputTopic(final String inputTopic) {
-    this.inputTopic = inputTopic;
-    return this;
+  public Uc1KafkaStreamsBuilder(final Configuration config) {
+    super(config);
   }
 
   @Override
diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties
index 3fb301516..b46e6246e 100644
--- a/uc1-application/src/main/resources/META-INF/application.properties
+++ b/uc1-application/src/main/resources/META-INF/application.properties
@@ -3,10 +3,6 @@ application.version=0.0.1
 
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
-kafka.output.topic=output
 
 schema.registry.url=http://localhost:8091
 
-num.threads=1
-commit.interval.ms=100
-cache.max.bytes.buffering=-1
diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
index c094adfcd..2f828278f 100644
--- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
+++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
@@ -36,26 +36,15 @@ public class AggregationService {
    * @param clusterSession the database session which the application should use.
    */
   private void createKafkaStreamsApplication() {
-    // Use case specific stream configuration
-    final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder();
+    final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
     uc2KafkaStreamsBuilder
-        .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
         .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC))
         .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
         .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC))
         .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)))
         .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)));
 
-    // Configuration of the stream application
-    final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
-        .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
-        .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
-        .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
-        .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
-        .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
-        .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
-        .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
-        .build();
+    final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
 
     this.stopEvent.thenRun(kafkaStreams::close);
     kafkaStreams.start();
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
index 16addb851..1a606ee3d 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
@@ -14,16 +15,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
   private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1);
   private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
 
-  private String inputTopic; // NOPMD
   private String feedbackTopic; // NOPMD
   private String outputTopic; // NOPMD
   private String configurationTopic; // NOPMD
   private Duration emitPeriod; // NOPMD
   private Duration gracePeriod; // NOPMD
 
-  public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) {
-    this.inputTopic = inputTopic;
-    return this;
+  public Uc2KafkaStreamsBuilder(final Configuration config) {
+    super(config);
   }
 
   public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) {
diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties
index 10c47960a..8f1af5f59 100644
--- a/uc2-application/src/main/resources/META-INF/application.properties
+++ b/uc2-application/src/main/resources/META-INF/application.properties
@@ -10,8 +10,4 @@ kafka.output.topic=output
 schema.registry.url=http://localhost:8091
 
 emit.period.ms=5000
-grace.period.ms=0
-
-num.threads=1
-commit.interval.ms=100
-cache.max.bytes.buffering=-1
+grace.period.ms=0
\ No newline at end of file
diff --git a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
deleted file mode 100644
index ab6f08c01..000000000
--- a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package theodolite.uc3.application;
-
-/**
- * Keys to access configuration parameters.
- */
-public final class ConfigurationKeys {
-
-  public static final String APPLICATION_NAME = "application.name";
-
-  public static final String APPLICATION_VERSION = "application.version";
-
-  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
-
-  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
-
-  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
-
-  public static final String NUM_THREADS = "num.threads";
-
-  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
-
-  public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
-
-  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
-
-  private ConfigurationKeys() {
-  }
-
-}
diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
index b245b1645..349512f98 100644
--- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
+++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
@@ -34,23 +34,13 @@ public class HistoryService {
    *
    */
   private void createKafkaStreamsApplication() {
-    // Use case specific stream configuration
-    final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder();
+    final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config);
     uc3KafkaStreamsBuilder
-        .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
         .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
         .windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
 
-    // Configuration of the stream application
-    final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder
-        .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
-        .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
-        .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
-        .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
-        .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
-        .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
-        .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
-        .build();
+    final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build();
+
     this.stopEvent.thenRun(kafkaStreams::close);
     kafkaStreams.start();
   }
diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
index e74adf7c8..9ab4ea0a9 100644
--- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
+++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
@@ -11,13 +12,11 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  */
 public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
 
-  private String inputTopic; // NOPMD
   private String outputTopic; // NOPMD
   private Duration windowDuration; // NOPMD
 
-  public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) {
-    this.inputTopic = inputTopic;
-    return this;
+  public Uc3KafkaStreamsBuilder(final Configuration config) {
+    super(config);
   }
 
   public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) {
diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties
index 2ceaf3722..011406f7e 100644
--- a/uc3-application/src/main/resources/META-INF/application.properties
+++ b/uc3-application/src/main/resources/META-INF/application.properties
@@ -7,7 +7,3 @@ kafka.output.topic=output
 kafka.window.duration.minutes=1
 
 schema.registry.url=http://localhost:8091
-
-num.threads=1
-commit.interval.ms=100
-cache.max.bytes.buffering=-1
diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java
index 23af80573..12f35e8dc 100644
--- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java
+++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java
@@ -32,9 +32,8 @@ public class HistoryService {
    */
   private void createKafkaStreamsApplication() {
     // Use case specific stream configuration
-    final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder();
+    final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config);
     uc4KafkaStreamsBuilder
-        .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
         .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
         .aggregtionDuration(
             Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
@@ -42,15 +41,7 @@ public class HistoryService {
             Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
 
     // Configuration of the stream application
-    final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder
-        .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
-        .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
-        .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
-        .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
-        .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
-        .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
-        .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
-        .build();
+    final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build();
 
     this.stopEvent.thenRun(kafkaStreams::close);
     kafkaStreams.start();
diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
index 7c9e2c4f7..bbbb04311 100644
--- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
+++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java
@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
 
 import java.time.Duration;
 import java.util.Objects;
+import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
@@ -11,14 +12,12 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  */
 public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
 
-  private String inputTopic; // NOPMD
   private String outputTopic; // NOPMD
   private Duration aggregtionDuration; // NOPMD
   private Duration aggregationAdvance; // NOPMD
 
-  public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) {
-    this.inputTopic = inputTopic;
-    return this;
+  public Uc4KafkaStreamsBuilder(final Configuration config) {
+    super(config);
   }
 
   public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) {
diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties
index e577c880a..b46681533 100644
--- a/uc4-application/src/main/resources/META-INF/application.properties
+++ b/uc4-application/src/main/resources/META-INF/application.properties
@@ -8,7 +8,3 @@ aggregation.duration.days=30
 aggregation.advance.days=1
 
 schema.registry.url=http://localhost:8091
-
-num.threads=1
-commit.interval.ms=100
-cache.max.bytes.buffering=-1
-- 
GitLab