From 191b9eb12625ae90713318b96bf54433f08ab26c Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 11 Nov 2021 15:09:36 +0100
Subject: [PATCH] Add beam common project + plugin

---
 .../beam-commons/build.gradle                 | 36 +++++++++++
 .../commons/beam/AbstractBeamService.java     | 61 +++++++++++++++++++
 .../commons/beam/ConfigurationKeys.java       | 47 ++++++++++++++
 .../KafkaAggregatedPowerRecordReader.java     | 52 ++++++++++++++++
 .../src/main/groovy/theodolite.beam.gradle    |  2 +-
 theodolite-benchmarks/settings.gradle         |  1 +
 6 files changed, 198 insertions(+), 1 deletion(-)
 create mode 100644 theodolite-benchmarks/beam-commons/build.gradle
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java

diff --git a/theodolite-benchmarks/beam-commons/build.gradle b/theodolite-benchmarks/beam-commons/build.gradle
new file mode 100644
index 000000000..c3b07a8b3
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/build.gradle
@@ -0,0 +1,36 @@
+plugins {
+    id 'theodolite.java-commons'
+}
+
+repositories {
+  jcenter()
+  maven {
+    url "https://oss.sonatype.org/content/repositories/snapshots/"
+  }
+  maven {
+    url 'https://packages.confluent.io/maven/'
+  }
+}
+
+dependencies {
+  // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
+  implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
+  implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
+  implementation 'com.google.code.gson:gson:2.8.2'
+  implementation 'com.google.guava:guava:24.1-jre'
+  implementation 'org.slf4j:slf4j-simple:1.7.25'
+
+  implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
+    exclude group: 'org.apache.kafka', module: 'kafka-clients'
+  }
+  implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
+  implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
+
+//    compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
+
+  runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
+  runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
+
+  // Use JUnit test framework
+  testImplementation 'junit:junit:4.12'
+}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
new file mode 100644
index 000000000..b91f47cfa
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
@@ -0,0 +1,61 @@
+package theodolite.commons.beam;
+
+import java.util.Properties;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import titan.ccp.common.configuration.ServiceConfigurations;
+
+/**
+ * Abstraction of a beam microservice.
+ */
+public class AbstractBeamService {
+
+  // Application Configurations
+  public static final Configuration CONFIG = ServiceConfigurations.createWithDefaults();
+  public static final String APPLICATION_NAME =
+      CONFIG.getString(ConfigurationKeys.APPLICATION_NAME);
+
+  // Beam Pipeline
+  protected PipelineOptions options;
+
+  public AbstractBeamService(String[] args) {
+    options = PipelineOptionsFactory.fromArgs(args).create();
+    options.setJobName(APPLICATION_NAME);
+  }
+
+
+  /**
+   * Abstract main for a Beam Service.
+   */
+  public static void main(final String[] args) {
+    AbstractBeamService service = new AbstractBeamService(args);
+    service.run();
+  }
+
+  public void run() {
+  }
+
+  /**
+   * Builds a simple configuration for a Kafka consumer.
+   *
+   * @return the build Kafka consumer configuration.
+   */
+  public Properties buildConsumerConfig() {
+    Properties consumerConfig = new Properties();
+    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        CONFIG
+            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+    consumerConfig.put("schema.registry.url",
+        CONFIG.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
+
+    consumerConfig.put("specific.avro.reader",
+        CONFIG.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPLICATION_NAME);
+    return consumerConfig;
+  }
+
+}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java
new file mode 100644
index 000000000..2e60a1b4e
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java
@@ -0,0 +1,47 @@
+package theodolite.commons.beam;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+  // Common keys
+  public static final String APPLICATION_NAME = "application.name";
+
+  public static final String APPLICATION_VERSION = "application.version";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  // Additional topics
+  public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
+
+  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+  public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
+
+  // UC2
+  public static final String EMIT_PERIOD_MS = "emit.period.ms";
+
+  public static final String GRACE_PERIOD_MS = "grace.period.ms";
+
+  // UC3
+  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+
+  // UC4
+  public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
+
+  public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
+
+
+  // BEAM
+  public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config";
+  public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config";
+  public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
+
+  private ConfigurationKeys() {
+  }
+
+}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java
new file mode 100644
index 000000000..7b0913810
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java
@@ -0,0 +1,52 @@
+package theodolite.commons.beam.kafka;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Map;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
+ */
+public class KafkaAggregatedPowerRecordReader extends
+    PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
+
+  private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
+
+
+  /**
+   * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic,
+                                          Map<Object, Object> consumerConfig) {
+    super();
+
+    // Check if boostrap server and inputTopic are defined
+    if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
+      throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
+    }
+
+    reader =
+        KafkaIO.<String, ActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(inputTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
+                AvroCoder.of(ActivePowerRecord.class))
+            .withConsumerConfigUpdates(consumerConfig)
+            .withoutMetadata();
+  }
+
+  @Override
+  public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) {
+    return input.apply(this.reader);
+  }
+
+}
diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
index c39f2f5b4..ad9689a5e 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle
@@ -26,7 +26,7 @@ dependencies {
     implementation 'com.google.code.gson:gson:2.8.2'
     implementation 'com.google.guava:guava:24.1-jre'
     implementation 'org.slf4j:slf4j-simple:1.7.25'
-//    implementation project(':beam-commons')
+    implementation project(':beam-commons')
 
     implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
 //    compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index e28e811c1..7470f7a45 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -3,6 +3,7 @@ rootProject.name = 'theodolite-benchmarks'
 include 'load-generator-commons'
 include 'kstreams-commons'
 include 'flink-commons'
+include 'beam-commons'
 
 include 'uc1-load-generator'
 include 'uc1-kstreams'
-- 
GitLab