From 786b874b126f3814da2c6f826e120b446659400d Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 19 Nov 2021 11:06:40 +0100
Subject: [PATCH] Add uc2-beam-flink + Put inputTopic and bootstrapServer
 variable inside AbstractPipeline

---
 .../commons/beam/AbstractPipeline.java        |   6 +
 theodolite-benchmarks/settings.gradle         |   1 +
 .../java/application/Uc1BeamPipeline.java     |   4 -
 .../uc2-beam-flink/build.gradle               |  24 +--
 .../java/application/Uc2ApplicationBeam.java  | 139 ------------------
 .../main/java/application/Uc2BeamFlink.java   |  45 ++++++
 theodolite-benchmarks/uc2-beam/build.gradle   |   5 +
 .../java/application/StatsAggregation.java    |   0
 .../main/java/application/StatsToString.java  |  17 +++
 .../java/application/Uc2BeamPipeline.java     |  89 +++++++++++
 .../resources/META-INF/application.properties |  17 +++
 11 files changed, 183 insertions(+), 164 deletions(-)
 delete mode 100644 theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java
 create mode 100644 theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
 create mode 100644 theodolite-benchmarks/uc2-beam/build.gradle
 rename theodolite-benchmarks/{uc2-beam-flink => uc2-beam}/src/main/java/application/StatsAggregation.java (100%)
 create mode 100644 theodolite-benchmarks/uc2-beam/src/main/java/application/StatsToString.java
 create mode 100644 theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
 create mode 100644 theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
index 7588e6abe..c75aa62d9 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
@@ -14,9 +14,15 @@ public class AbstractPipeline extends Pipeline {
   // Application Configurations
   private final Configuration config;
 
+  protected final String inputTopic;
+  protected final String bootstrapServer;
+
   protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
     super(options);
     this.config = config;
+
+    inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
   }
 
   /**
diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index d6db9202c..672b76a17 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -6,6 +6,7 @@ include 'flink-commons'
 include 'beam-commons'
 
 include 'uc1-beam'
+include 'uc2-beam'
 
 include 'uc1-load-generator'
 include 'uc1-kstreams'
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index b0f6c8646..f43f50890 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -33,12 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
 
   Uc1BeamPipeline(PipelineOptions options, Configuration config) {
     super(options, config);
-    // Additional needed fields
-    String inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
-    String bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
 
     // Set Coders for Classes that will be distributed
-
     final CoderRegistry cr = this.getCoderRegistry();
     cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
diff --git a/theodolite-benchmarks/uc2-beam-flink/build.gradle b/theodolite-benchmarks/uc2-beam-flink/build.gradle
index 00ee51e80..c552af040 100644
--- a/theodolite-benchmarks/uc2-beam-flink/build.gradle
+++ b/theodolite-benchmarks/uc2-beam-flink/build.gradle
@@ -1,29 +1,11 @@
 plugins {
-  id 'theodolite.kstreams'
-}
-
-allprojects {
-  repositories {
-    maven {
-      url 'https://packages.confluent.io/maven/'
-    }
-    mavenCentral()
-  }
+  id 'theodolite.beam'
 }
 
 
 dependencies {
-  compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
   compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
-
-  compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
-    exclude group: 'org.apache.kafka', module: 'kafka-clients'
-  }
-  compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
-
-  runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
-  runtime 'org.slf4j:slf4j-api:1.7.32'
-  runtime 'org.slf4j:slf4j-jdk14:1.7.32'
+  compile project(':uc2-beam')
 }
 
-mainClassName = "application.Uc2ApplicationBeam"
+mainClassName = "application.Uc2BeamFlink"
diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java
deleted file mode 100644
index e001d7219..000000000
--- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package application;
-
-import com.google.common.math.Stats;
-import com.google.common.math.StatsAccumulator;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.HashMap;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.Duration;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute
- * locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator
- * using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
- * using--flinkMaster as run parameter.
- */
-public final class Uc2ApplicationBeam {
-  private static final String JOB_NAME = "Uc2Application";
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String OUTPUT = "OUTPUT";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
-  private static final String KAFKA_WINDOW_DURATION_MINUTES  = "KAFKA_WINDOW_DURATION_MINUTES";
-
-  /**
-   * Private constructor to avoid instantiation.
-   */
-  private Uc2ApplicationBeam() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Start running this microservice.
-   */
-  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
-  public static void main(final String[] args) {
-
-    // Set Configuration for Windows
-    final int windowDurationMinutes = Integer.parseInt(
-        System.getenv(KAFKA_WINDOW_DURATION_MINUTES) == null
-            ? "1"
-            : System.getenv(KAFKA_WINDOW_DURATION_MINUTES));
-    final Duration duration = Duration.standardMinutes(windowDurationMinutes);
-
-    // Set Configuration for Kafka
-    final String bootstrapServer =
-        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
-            : System.getenv(BOOTSTRAP);
-    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
-    final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
-    final String schemaRegistryUrl =
-        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
-        : System.getenv(SCHEMA_REGISTRY);
-
-    // Set consumer configuration for the schema registry and commits back to Kafka
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
-    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "ucaplication");
-
-    // Create Pipeline Options from args.
-    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setJobName(JOB_NAME);
-    options.setRunner(FlinkRunner.class);
-
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final CoderRegistry cr = pipeline.getCoderRegistry();
-
-    // Set Coders for Classes that will be distributed
-    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
-    cr.registerCoderForClass(StatsAggregation.class,
-        SerializableCoder.of(StatsAggregation.class));
-    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
-
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            .withoutMetadata();
-    // Apply pipeline transformations
-    // Read from Kafka
-    pipeline.apply(kafka)
-        // Apply a fixed window
-        .apply(Window
-            .<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
-        // Aggregate per window for every key
-        .apply(Combine.<String, ActivePowerRecord, Stats>perKey(
-            new StatsAggregation()))
-        .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
-        // Map into correct output format
-        .apply(MapElements
-            .via(new SimpleFunction<KV<String, Stats>, KV<String, String>>() {
-              @Override
-              public KV<String, String> apply(final KV<String, Stats> kv) {
-                return KV.of(kv.getKey(), kv.getValue().toString());
-              }
-            }))
-        // Write to Kafka
-        .apply(KafkaIO.<String, String>write()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(outputTopic)
-            .withKeySerializer(StringSerializer.class)
-            .withValueSerializer(StringSerializer.class));
-
-    pipeline.run().waitUntilFinish();
-  }
-}
-
diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
new file mode 100644
index 000000000..e777b7be3
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
@@ -0,0 +1,45 @@
+package application;
+
+import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.sdk.Pipeline;
+import theodolite.commons.beam.AbstractBeamService;
+
+/**
+ * Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute
+ * locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator
+ * using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
+ * using--flinkMaster as run parameter.
+ */
+public final class Uc2BeamFlink extends AbstractBeamService {
+  private static final String JOB_NAME = "Uc2Application";
+  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
+  private static final String INPUT = "INPUT";
+  private static final String OUTPUT = "OUTPUT";
+  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
+  private static final String YES = "true";
+  private static final String USE_AVRO_READER = YES;
+  private static final String AUTO_COMMIT_CONFIG = YES;
+  private static final String KAFKA_WINDOW_DURATION_MINUTES = "KAFKA_WINDOW_DURATION_MINUTES";
+
+  /**
+   * Private constructor setting specific options for this use case.
+   */
+  private Uc2BeamFlink(final String[] args) { //NOPMD
+    super(args);
+    this.options.setRunner(FlinkRunner.class);
+  }
+
+  /**
+   * Start running this microservice.
+   */
+  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
+  public static void main(final String[] args) {
+
+    Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args);
+
+    Pipeline pipeline = new Uc2BeamPipeline(uc2BeamFlink.options, uc2BeamFlink.getConfig());
+
+    pipeline.run().waitUntilFinish();
+  }
+}
+
diff --git a/theodolite-benchmarks/uc2-beam/build.gradle b/theodolite-benchmarks/uc2-beam/build.gradle
new file mode 100644
index 000000000..502e94fa7
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam/build.gradle
@@ -0,0 +1,5 @@
+plugins {
+  id 'theodolite.beam'
+}
+
+
diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/StatsAggregation.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/StatsAggregation.java
similarity index 100%
rename from theodolite-benchmarks/uc2-beam-flink/src/main/java/application/StatsAggregation.java
rename to theodolite-benchmarks/uc2-beam/src/main/java/application/StatsAggregation.java
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/StatsToString.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/StatsToString.java
new file mode 100644
index 000000000..cedecd8df
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/StatsToString.java
@@ -0,0 +1,17 @@
+package application;
+
+import com.google.common.math.Stats;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Transforms a {@code KV<String, Stats>} into a {@code KV<String, String>}.
+ */
+public class StatsToString extends SimpleFunction<KV<String, Stats>, KV<String, String>> {
+  private static final long serialVersionUID = 4308991244493097240L;
+
+  @Override
+  public KV<String, String> apply(final KV<String, Stats> kv) {
+    return KV.of(kv.getKey(), kv.getValue().toString());
+  }
+}
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
new file mode 100644
index 000000000..78988e4cb
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
@@ -0,0 +1,89 @@
+package application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import java.util.Properties;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import theodolite.commons.beam.AbstractPipeline;
+import theodolite.commons.beam.ConfigurationKeys;
+import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
+import theodolite.commons.beam.kafka.KafkaWriterTransformation;
+import titan.ccp.model.records.ActivePowerRecord;
+
+
+/**
+ * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
+ * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
+ * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
+ * using--flinkMaster as run parameter. To persist logs add
+ * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
+ * Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc2BeamPipeline extends AbstractPipeline {
+
+  protected Uc2BeamPipeline(PipelineOptions options, Configuration config) {
+    super(options, config);
+    // Additional needed variables
+    String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+
+    final int windowDurationMinutes = Integer.parseInt(
+        config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
+    final Duration duration = Duration.standardMinutes(windowDurationMinutes);
+
+    // Build kafka configuration
+    Properties consumerConfig = buildConsumerConfig();
+
+    // Set Coders for Classes that will be distributed
+    final CoderRegistry cr = this.getCoderRegistry();
+    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
+    cr.registerCoderForClass(StatsAggregation.class,
+        SerializableCoder.of(StatsAggregation.class));
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+
+
+    // Read from Kafka
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
+        kafkaActivePowerRecordReader =
+        new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
+
+    // Transform into String
+    final StatsToString statsToString = new StatsToString();
+
+    // Write to Kafka
+    final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
+        new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
+
+    // Apply pipeline transformations
+    this.apply(kafkaActivePowerRecordReader)
+        // Apply a fixed window
+        .apply(Window
+            .<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
+        // Aggregate per window for every key
+        .apply(Combine.<String, ActivePowerRecord, Stats>perKey(
+            new StatsAggregation()))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
+        // Map into correct output format
+        .apply(MapElements
+            .via(statsToString))
+        // Write to Kafka
+        .apply(kafkaWriter);
+  }
+}
+
diff --git a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..73fb96fea
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties
@@ -0,0 +1,17 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+kafka.window.duration.minutes=1
+
+schema.registry.url=http://localhost:8081
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
+
+specific.avro.reader=True
+enable.auto.commit.config=True
+auto.offset.reset.config=earliest
\ No newline at end of file
-- 
GitLab