From b531fed4716d310c00c30b196915286779c09104 Mon Sep 17 00:00:00 2001
From: ben <stu126940@mail.uni-kiel.de>
Date: Mon, 16 Mar 2020 12:19:47 +0100
Subject: [PATCH] add wrongly deleted package

---
 .../streamprocessing/KafkaStreamsBuilder.java | 92 +++++++++++++++++++
 .../uc1/streamprocessing/TopologyBuilder.java | 45 +++++++++
 2 files changed, 137 insertions(+)
 create mode 100644 uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java
 create mode 100644 uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java

diff --git a/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java
new file mode 100644
index 000000000..706cf7902
--- /dev/null
+++ b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java
@@ -0,0 +1,92 @@
+package uc1.streamprocessing;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import titan.ccp.common.kafka.streams.PropertiesBuilder;
+
+/**
+ * Builder for the Kafka Streams configuration.
+ */
+public class KafkaStreamsBuilder {
+
+  private static final String APPLICATION_NAME = "titan-ccp-history";
+  private static final String APPLICATION_VERSION = "0.0.1";
+
+  // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
+
+  private String bootstrapServers; // NOPMD
+  private String inputTopic; // NOPMD
+  private int numThreads = -1; // NOPMD
+  private int commitIntervalMs = -1; // NOPMD
+  private int cacheMaxBytesBuff = -1; // NOPMD
+
+  public KafkaStreamsBuilder inputTopic(final String inputTopic) {
+    this.inputTopic = inputTopic;
+    return this;
+  }
+
+  public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
+    this.bootstrapServers = bootstrapServers;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
+   * one for using the default.
+   */
+  public KafkaStreamsBuilder numThreads(final int numThreads) {
+    if (numThreads < -1 || numThreads == 0) {
+      throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
+    }
+    this.numThreads = numThreads;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the frequency with which to save the position (offsets in
+   * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   */
+  public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
+    if (commitIntervalMs < -1) {
+      throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
+    }
+    this.commitIntervalMs = commitIntervalMs;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
+   * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   */
+  public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
+    if (cacheMaxBytesBuffering < -1) {
+      throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
+    }
+    this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
+    return this;
+  }
+
+  /**
+   * Builds the {@link KafkaStreams} instance.
+   */
+  public KafkaStreams build() {
+    Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
+    // TODO log parameters
+    final TopologyBuilder topologyBuilder = new TopologyBuilder(
+        this.inputTopic);
+    final Properties properties = PropertiesBuilder
+        .bootstrapServers(this.bootstrapServers)
+        .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
+        .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
+        .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
+        .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
+        .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
+        .build();
+    return new KafkaStreams(topologyBuilder.build(), properties);
+  }
+
+}
diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java
new file mode 100644
index 000000000..140c592f4
--- /dev/null
+++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java
@@ -0,0 +1,45 @@
+package uc1.streamprocessing;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+
+/**
+ * Builds Kafka Stream Topology for the History microservice.
+ */
+public class TopologyBuilder {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+
+  private final String inputTopic;
+
+  private final StreamsBuilder builder = new StreamsBuilder();
+
+  /**
+   * Create a new {@link TopologyBuilder} using the given topics.
+   */
+  public TopologyBuilder(final String inputTopic) {
+    this.inputTopic = inputTopic;
+  }
+
+  /**
+   * Build the {@link Topology} for the History microservice.
+   */
+  public Topology build() {
+
+    this.builder
+        .stream(this.inputTopic, Consumed.with(
+            Serdes.String(),
+            IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+        .mapValues(value -> value.getValueInW())
+        .foreach((key, measurement) -> LOGGER
+            .info("Key: " + key + " Value: " + measurement));
+
+    return this.builder.build();
+  }
+}
-- 
GitLab