From 66ae4c77bde18e623ce0518779ad7351d7a1005c Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 11 Nov 2021 15:10:52 +0100
Subject: [PATCH] Change uc1-beam-flink to use beam commons

---
 .../java/application/Uc1ApplicationBeam.java  | 68 ++++++++-----------
 .../resources/META-INF/application.properties | 16 +++++
 2 files changed, 43 insertions(+), 41 deletions(-)
 create mode 100644 theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties

diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java
index 081cbaedf..4cf6bf616 100644
--- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java
+++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java
@@ -2,14 +2,12 @@ package application;
 
 import com.google.gson.Gson;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.HashMap;
+import java.util.Properties;
 import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -18,10 +16,11 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.beam.AbstractBeamService;
+import theodolite.commons.beam.ConfigurationKeys;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
@@ -32,20 +31,20 @@ import titan.ccp.model.records.ActivePowerRecord;
  * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
  * Input Output in Common in the Run Configuration Start via Eclipse Run.
  */
-public final class Uc1ApplicationBeam {
+public final class Uc1ApplicationBeam extends AbstractBeamService {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
+  private final String inputTopic = CONFIG.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+  private final String bootstrapServer =
+      CONFIG.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
 
   /**
-   * Private constructor to avoid instantiation.
+   * Private constructor setting specific options for this use case.
    */
-  private Uc1ApplicationBeam() {
-    throw new UnsupportedOperationException();
+  private Uc1ApplicationBeam(final String[] args) { //NOPMD
+    super(args);
+    LOGGER.info(this.options.toString());
+    this.options.setRunner(FlinkRunner.class);
   }
 
   /**
@@ -55,45 +54,31 @@ public final class Uc1ApplicationBeam {
   @SuppressWarnings({"unchecked", "rawtypes","unused"})
   public static void main(final String[] args) {
 
-    // Set Configuration for Kafka
-    final String bootstrapServer =
-        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
-            : System.getenv(BOOTSTRAP);
-    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
-    final String schemaRegistryUrl =
-        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
-            : System.getenv(SCHEMA_REGISTRY);
-    // Set consumer configuration for the schema registry and commits back to Kafka
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
-    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
-
-    final LogKeyValue logKeyValue = new LogKeyValue();
-
-    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setJobName("ucapplication");
-    options.setRunner(FlinkRunner.class);
-
-    final Pipeline pipeline = Pipeline.create(options);
+    final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args);
 
-    final CoderRegistry cr = pipeline.getCoderRegistry();
+    // create pipeline
+    final Pipeline pipeline = Pipeline.create(uc1.options);
 
     // Set Coders for Classes that will be distributed
+    final CoderRegistry cr = pipeline.getCoderRegistry();
     cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
+    // build KafkaConsumerConfig
+    final Properties consumerConfig = uc1.buildConsumerConfig();
+
+    // Create Pipeline transformations
     final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
         KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
+            .withBootstrapServers(uc1.bootstrapServer)
+            .withTopic(uc1.inputTopic)
             .withKeyDeserializer(StringDeserializer.class)
             .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
                 AvroCoder.of(ActivePowerRecord.class))
             .withConsumerConfigUpdates(consumerConfig)
             .withoutMetadata();
 
+    final LogKeyValue logKeyValue = new LogKeyValue();
+
     // Apply pipeline transformations
     // Read from Kafka
     pipeline.apply(kafka)
@@ -119,11 +104,12 @@ public final class Uc1ApplicationBeam {
     pipeline.run().waitUntilFinish();
   }
 
+
   /**
    * Logs all Key Value pairs.
    */
   @SuppressWarnings({"unused"})
-  private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
+  private static class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
     private static final long serialVersionUID = 4328743;
 
     @ProcessElement
diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..50db1510a
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,16 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+
+schema.registry.url=http://localhost:8081
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
+
+specific.avro.reader=True
+enable.auto.commit.config=True
+auto.offset.reset.config=earliest
\ No newline at end of file
-- 
GitLab