From abe9abcf7145bcfb5bfcc12c3aefb0bcf8e75dca Mon Sep 17 00:00:00 2001
From: ben <stu126940@mail.uni-kiel.de>
Date: Mon, 16 Mar 2020 16:23:51 +0100
Subject: [PATCH] add src folder

---
 .../uc3/application/ConfigurationKeys.java    |  25 +++++
 .../java/uc3/application/HistoryService.java  |  52 +++++++++
 .../streamprocessing/KafkaStreamsBuilder.java | 106 ++++++++++++++++++
 .../uc3/streamprocessing/TopologyBuilder.java |  54 +++++++++
 4 files changed, 237 insertions(+)
 create mode 100644 uc3-application/src/main/java/uc3/application/ConfigurationKeys.java
 create mode 100644 uc3-application/src/main/java/uc3/application/HistoryService.java
 create mode 100644 uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
 create mode 100644 uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java

diff --git a/uc3-application/src/main/java/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/uc3/application/ConfigurationKeys.java
new file mode 100644
index 000000000..884927979
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/application/ConfigurationKeys.java
@@ -0,0 +1,25 @@
+package uc3.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+	public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+	public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+	public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+	public static final String NUM_THREADS = "num.threads";
+
+	public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+	public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
+
+	public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+
+	private ConfigurationKeys() {
+	}
+
+}
diff --git a/uc3-application/src/main/java/uc3/application/HistoryService.java b/uc3-application/src/main/java/uc3/application/HistoryService.java
new file mode 100644
index 000000000..e8385da58
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/application/HistoryService.java
@@ -0,0 +1,52 @@
+package uc3.application;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.streams.KafkaStreams;
+import titan.ccp.common.configuration.Configurations;
+import uc3.streamprocessing.KafkaStreamsBuilder;
+
+/**
+ * A microservice that manages the history and, therefore, stores and aggregates
+ * incoming measurements.
+ *
+ */
+public class HistoryService {
+
+	private final Configuration config = Configurations.create();
+
+	private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
+
+	/**
+	 * Start the service.
+	 *
+	 * @return {@link CompletableFuture} which is completed when the service is
+	 *         successfully started.
+	 */
+	public void run() {
+		this.createKafkaStreamsApplication();
+	}
+
+	/**
+	 * Build and start the underlying Kafka Streams application of the service.
+	 *
+	 */
+	private void createKafkaStreamsApplication() {
+		final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
+				.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
+				.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
+				.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
+				.windowDuration(Duration.ofMinutes(this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)))
+				.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
+				.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
+				.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build();
+		this.stopEvent.thenRun(kafkaStreams::close);
+		kafkaStreams.start();
+	}
+
+	public static void main(final String[] args) {
+		new HistoryService().run();
+	}
+
+}
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
new file mode 100644
index 000000000..5106ed2ad
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
@@ -0,0 +1,106 @@
+package uc3.streamprocessing;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import titan.ccp.common.kafka.streams.PropertiesBuilder;
+
+/**
+ * Builder for the Kafka Streams configuration.
+ */
+public class KafkaStreamsBuilder {
+
+	private static final String APPLICATION_NAME = "titan-ccp-history";
+	private static final String APPLICATION_VERSION = "0.0.1";
+
+	// private static final Logger LOGGER =
+	// LoggerFactory.getLogger(KafkaStreamsBuilder.class);
+
+	private String bootstrapServers; // NOPMD
+	private String inputTopic; // NOPMD
+	private String outputTopic; // NOPMD
+	private Duration windowDuration; // NOPMD
+	private int numThreads = -1; // NOPMD
+	private int commitIntervalMs = -1; // NOPMD
+	private int cacheMaxBytesBuff = -1; // NOPMD
+
+	public KafkaStreamsBuilder inputTopic(final String inputTopic) {
+		this.inputTopic = inputTopic;
+		return this;
+	}
+
+	public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
+		this.bootstrapServers = bootstrapServers;
+		return this;
+	}
+
+	public KafkaStreamsBuilder outputTopic(final String outputTopic) {
+		this.outputTopic = outputTopic;
+		return this;
+	}
+
+	public KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
+		this.windowDuration = windowDuration;
+		return this;
+	}
+
+	/**
+	 * Sets the Kafka Streams property for the number of threads
+	 * (num.stream.threads). Can be minus one for using the default.
+	 */
+	public KafkaStreamsBuilder numThreads(final int numThreads) {
+		if (numThreads < -1 || numThreads == 0) {
+			throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
+		}
+		this.numThreads = numThreads;
+		return this;
+	}
+
+	/**
+	 * Sets the Kafka Streams property for the frequency with which to save the
+	 * position (offsets in source topics) of tasks (commit.interval.ms). Must be
+	 * zero for processing all record, for example, when processing bulks of
+	 * records. Can be minus one for using the default.
+	 */
+	public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
+		if (commitIntervalMs < -1) {
+			throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
+		}
+		this.commitIntervalMs = commitIntervalMs;
+		return this;
+	}
+
+	/**
+	 * Sets the Kafka Streams property for maximum number of memory bytes to be used
+	 * for record caches across all threads (cache.max.bytes.buffering). Must be
+	 * zero for processing all record, for example, when processing bulks of
+	 * records. Can be minus one for using the default.
+	 */
+	public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
+		if (cacheMaxBytesBuffering < -1) {
+			throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
+		}
+		this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
+		return this;
+	}
+
+	/**
+	 * Builds the {@link KafkaStreams} instance.
+	 */
+	public KafkaStreams build() {
+		Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
+		// TODO log parameters
+		final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
+				this.windowDuration);
+		final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
+				.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
+				.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
+				.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
+				.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
+				.set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG").build();
+		return new KafkaStreams(topologyBuilder.build(), properties);
+	}
+
+}
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
new file mode 100644
index 000000000..608c00940
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
@@ -0,0 +1,54 @@
+package uc3.streamprocessing;
+
+import com.google.gson.Gson;
+import java.time.Duration;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+
+/**
+ * Builds Kafka Stream Topology for the History microservice.
+ */
+public class TopologyBuilder {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+
+	private final String inputTopic;
+	private final String outputTopic;
+	private final Duration duration;
+	private final Gson gson;
+
+	private final StreamsBuilder builder = new StreamsBuilder();
+
+	/**
+	 * Create a new {@link TopologyBuilder} using the given topics.
+	 */
+	public TopologyBuilder(final String inputTopic, final String outputTopic, final Duration duration) {
+		this.inputTopic = inputTopic;
+		this.outputTopic = outputTopic;
+		this.duration = duration;
+		this.gson = new Gson();
+	}
+
+	/**
+	 * Build the {@link Topology} for the History microservice.
+	 */
+	public Topology build() {
+		this.builder
+				.stream(this.inputTopic,
+						Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+				.groupByKey().windowedBy(TimeWindows.of(this.duration))
+				.aggregate(() -> 0.0, (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
+						Materialized.with(Serdes.String(), Serdes.Double()))
+				.toStream().peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)).to(this.outputTopic);
+
+		return this.builder.build();
+	}
+}
-- 
GitLab