From 6ca1e53cad68656b664ad07f2be9783673b8a4f5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Thu, 14 May 2020 16:38:23 +0200
Subject: [PATCH] Implement an abstract KafkaStreamsBuilder class for streams
 applications

Create an abstract KafkaStreamsBuilder class that can be used in kafka
streams use cases.
This should reduce the redundancy for reconfigure the properties in
every streams application.
Now only the topology needs to be defined by the use cases.
---
 .../kafkastreams/KafkaStreamsBuilder.java     | 136 ++++++++++++++++++
 1 file changed, 136 insertions(+)
 create mode 100644 application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java

diff --git a/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java
new file mode 100644
index 000000000..cad108881
--- /dev/null
+++ b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java
@@ -0,0 +1,136 @@
+package spesb.commons.kafkastreams;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import titan.ccp.common.kafka.streams.PropertiesBuilder;
+
+/**
+ * Builder for the Kafka Streams configuration.
+ */
+public abstract class KafkaStreamsBuilder {
+
+  // Kafkastreams application specific
+  private String applicationName; // NOPMD
+  private String applicationVersion; // NOPMD
+  private String bootstrapServers; // NOPMD
+  private int numThreads = -1; // NOPMD
+  private int commitIntervalMs = -1; // NOPMD
+  private int cacheMaxBytesBuff = -1; // NOPMD
+
+  /**
+   * Sets the application name for the {@code KafkaStreams} application. It is used to create the
+   * application ID.
+   *
+   * @param applicationName Name of the application.
+   * @return
+   */
+  public KafkaStreamsBuilder applicationName(final String applicationName) {
+    this.applicationName = applicationName;
+    return this;
+  }
+
+  /**
+   * Sets the application version for the {@code KafkaStreams} application. It is used to create the
+   * application ID.
+   *
+   * @param applicationVersion Version of the application.
+   * @return
+   */
+  public KafkaStreamsBuilder applicationVersion(final String applicationVersion) {
+    this.applicationVersion = applicationVersion;
+    return this;
+  }
+
+  /**
+   * Sets the bootstrap servers for the {@code KafkaStreams} application.
+   *
+   * @param bootstrapServers String for a bootstrap server.
+   * @return
+   */
+  public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
+    this.bootstrapServers = bootstrapServers;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
+   * one for using the default.
+   *
+   * @param numThreads Number of threads. -1 for using the default.
+   * @return
+   */
+  public KafkaStreamsBuilder numThreads(final int numThreads) {
+    if (numThreads < -1 || numThreads == 0) {
+      throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
+    }
+    this.numThreads = numThreads;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the frequency with which to save the position (offsets in
+   * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   *
+   * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using
+   *        the default.
+   * @return
+   */
+  public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
+    if (commitIntervalMs < -1) {
+      throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
+    }
+    this.commitIntervalMs = commitIntervalMs;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
+   * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   *
+   * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all
+   *        threads. -1 for using the default.
+   * @return
+   */
+  public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
+    if (cacheMaxBytesBuffering < -1) {
+      throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
+    }
+    this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
+    return this;
+  }
+
+  /**
+   * Method to implement a {@link Topology} for a {@code KafkaStreams} application.
+   *
+   * @return A {@code Topology} for a {@code KafkaStreams} application.
+   */
+  protected abstract Topology buildTopology();
+
+  /**
+   * Builds the {@link KafkaStreams} instance.
+   */
+  public KafkaStreams build() {
+    // Check for required attributes for building properties.
+    Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
+    Objects.requireNonNull(this.applicationName, "Application name has not been set.");
+    Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
+
+    // Build properties.
+    final Properties properties = PropertiesBuilder
+        .bootstrapServers(this.bootstrapServers)
+        .applicationId(this.applicationName + '-' + this.applicationVersion)
+        .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
+        .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
+        .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
+        .build();
+
+    // Create the Kafka streams instance.
+    return new KafkaStreams(this.buildTopology(), properties);
+  }
+
+}
-- 
GitLab