From 363504abe7de1aa9e4cbd9988072c0336a53e905 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 19 Nov 2021 12:11:53 +0100
Subject: [PATCH] Add uc3-beam-flink with abstract Service + Pipeline

---
 theodolite-benchmarks/settings.gradle         |   1 +
 .../uc3-beam-flink/build.gradle               |  23 +--
 .../main/java/application/Uc3BeamFlink.java   | 134 +----------------
 .../resources/META-INF/application.properties |  22 +++
 theodolite-benchmarks/uc3-beam/build.gradle   |   5 +
 .../main/java/application/HourOfDayKey.java   |   1 -
 .../java/application/HourOfDayWithStats.java  |  17 +++
 .../java/application/Uc3BeamPipeline.java     | 135 ++++++++++++++++++
 8 files changed, 190 insertions(+), 148 deletions(-)
 create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/resources/META-INF/application.properties
 create mode 100644 theodolite-benchmarks/uc3-beam/build.gradle
 create mode 100644 theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java
 create mode 100644 theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java

diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index afbb472cf..904563db7 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -7,6 +7,7 @@ include 'beam-commons'
 
 include 'uc1-beam'
 include 'uc2-beam'
+include 'uc3-beam'
 
 include 'uc1-load-generator'
 include 'uc1-kstreams'
diff --git a/theodolite-benchmarks/uc3-beam-flink/build.gradle b/theodolite-benchmarks/uc3-beam-flink/build.gradle
index b202c53c0..1f318767d 100644
--- a/theodolite-benchmarks/uc3-beam-flink/build.gradle
+++ b/theodolite-benchmarks/uc3-beam-flink/build.gradle
@@ -1,30 +1,13 @@
 plugins {
-  id 'theodolite.kstreams'
+  id 'theodolite.beam'
 }
 
-allprojects {
-  repositories {
-    maven {
-      url 'https://packages.confluent.io/maven/'
-    }
-    mavenCentral()
-  }
-}
 
 dependencies {
-  compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
   compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
-
-  compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
-    exclude group: 'org.apache.kafka', module: 'kafka-clients'
-  }
-  compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
-
-  runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
-  runtime 'org.slf4j:slf4j-api:1.7.32'
-  runtime 'org.slf4j:slf4j-jdk14:1.7.32'
+  compile project(':uc3-beam')
 }
 
 
 // This is the path of the main class, stored within ./src/main/java/
-mainClassName = 'application.Uc3ApplicationBeam'
+mainClassName = 'application.Uc3BeamFlink'
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java
index 62b388935..0f74437d0 100644
--- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
+import theodolite.commons.beam.AbstractBeamService;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
@@ -40,25 +41,14 @@ import titan.ccp.model.records.ActivePowerRecord;
  * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard
  * Input Output in Common in the Run Configuration Start via Eclipse Run.
  */
-public final class Uc3ApplicationBeam {
-
-  private static final String JOB_NAME = "Uc3Application";
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String OUTPUT = "OUTPUT";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
-  private static final String KAFKA_WINDOW_DURATION_DAYS  = "KAFKA_WINDOW_DURATION_MINUTES";
-  private static final String AGGREGATION_ADVANCE_DAYS  = "AGGREGATION_ADVANCE_DAYS";
-  private static final String TRIGGER_INTERVAL  = "TRIGGER_INTERVAL";
+public final class Uc3BeamFlink extends AbstractBeamService {
 
   /**
    * Private constructor to avoid instantiation.
    */
-  private Uc3ApplicationBeam() {
-    throw new UnsupportedOperationException();
+  private Uc3BeamFlink(final String[] args) { //NOPMD
+    super(args);
+    this.options.setRunner(FlinkRunner.class);
   }
 
   /**
@@ -66,122 +56,12 @@ public final class Uc3ApplicationBeam {
    */
   public static void main(final String[] args) {
 
-    // Set Configuration for Windows
-    final int windowDuration = Integer.parseInt(
-        System.getenv(KAFKA_WINDOW_DURATION_DAYS) == null
-            ? "30" : System.getenv(KAFKA_WINDOW_DURATION_DAYS));
-    final Duration duration = Duration.standardDays(windowDuration);
-
-    final int aggregationAdvance = Integer.parseInt(
-        System.getenv(AGGREGATION_ADVANCE_DAYS) == null
-            ? "1" : System.getenv(AGGREGATION_ADVANCE_DAYS));
-    final Duration advance = Duration.standardDays(aggregationAdvance);
-
-    final int triggerInterval = Integer.parseInt(
-        System.getenv(TRIGGER_INTERVAL) == null
-            ? "15" : System.getenv(TRIGGER_INTERVAL));
-
-    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
-
-    // Set Configuration for Kafka
-    final String bootstrapServer =
-        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
-            : System.getenv(BOOTSTRAP);
-    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
-    final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
-    final String schemaRegistryUrl =
-        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
-            : System.getenv(SCHEMA_REGISTRY);
-
-    final Map<String, Object> consumerConfig = buildConsumerConfig(schemaRegistryUrl);
-    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
-
-    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setRunner(FlinkRunner.class);
-    options.setJobName(JOB_NAME);
-    final Pipeline pipeline = Pipeline.create(options);
-    final CoderRegistry cr = pipeline.getCoderRegistry();
-    registerCoders(cr);
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            // Set TimeStampPolicy for event time
-            .withTimestampPolicyFactory(
-                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
-            .withoutMetadata();
-    // Apply pipeline transformations
-    // Read from Kafka
-    pipeline.apply(kafka)
-        // Map to correct time format
-        .apply(MapElements.via(new MapTimeFormat()))
-
-        // Apply a sliding window
-        .apply(Window
-            .<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(advance))
-            .triggering(AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
-            .withAllowedLateness(Duration.ZERO)
-            .accumulatingFiredPanes())
-
-        // Aggregate per window for every key
-        .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
-            new StatsAggregation()))
-        .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
+    Uc3BeamFlink uc3BeamFlink = new Uc3BeamFlink(args);
 
-        // Map into correct output format
-        .apply(MapElements
-            .via(new SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>>() {
-              @Override
-              public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
-                return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
-              }
-            }))
-        // Write to Kafka
-        .apply(KafkaIO.<String, String>write()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(outputTopic)
-            .withKeySerializer(StringSerializer.class)
-            .withValueSerializer(StringSerializer.class));
+    Uc3BeamPipeline pipeline = new Uc3BeamPipeline(uc3BeamFlink.options, uc3BeamFlink.getConfig());
 
     pipeline.run().waitUntilFinish();
   }
 
-  /**
-   * Builds a configuration for a Kafka consumer.
-   * @param schemaRegistryUrl the url to the SchemaRegistry.
-   * @return the configuration.
-   */
-  public static Map<String, Object> buildConsumerConfig(final String schemaRegistryUrl) {
-
-    // Set consumer configuration for the schema registry and commits back to Kafka
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
-    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, JOB_NAME);
-
-    return consumerConfig;
-  }
-
-  /**
-   * Registers all Coders for all needed Coders.
-   * @param cr CoderRegistry.
-   */
-  private static void registerCoders(final CoderRegistry cr) {
-    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
-    cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
-    cr.registerCoderForClass(StatsAggregation.class,
-        SerializableCoder.of(StatsAggregation.class));
-    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
-  }
 }
 
diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-beam-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..6e9d22bca
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,22 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+kafka.window.duration.minutes=1
+
+schema.registry.url=http://localhost:8081
+
+aggregation.duration.days=30
+aggregation.advance.days=1
+
+trigger.interval=15
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
+
+specific.avro.reader=True
+enable.auto.commit.config=True
+auto.offset.reset.config=earliest
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-beam/build.gradle b/theodolite-benchmarks/uc3-beam/build.gradle
new file mode 100644
index 000000000..502e94fa7
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam/build.gradle
@@ -0,0 +1,5 @@
+plugins {
+  id 'theodolite.beam'
+}
+
+
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayKey.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayKey.java
index bd87abbe9..6db59dd65 100644
--- a/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayKey.java
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayKey.java
@@ -7,7 +7,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 /**
  * Composed key of an hour of the day and a sensor id.
  */
-
 @DefaultCoder(AvroCoder.class)
 public class HourOfDayKey {
 
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java
new file mode 100644
index 000000000..b2d037419
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/HourOfDayWithStats.java
@@ -0,0 +1,17 @@
+package application;
+
+import com.google.common.math.Stats;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ *
+ */
+public class HourOfDayWithStats extends SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>> {
+  private final HourOfDayKeyFactory keyFactory = new HourOfDayKeyFactory();
+
+  @Override
+  public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
+    return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
+  }
+}
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
new file mode 100644
index 000000000..e5a190fd9
--- /dev/null
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
@@ -0,0 +1,135 @@
+package application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Properties;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import theodolite.commons.beam.AbstractPipeline;
+import theodolite.commons.beam.ConfigurationKeys;
+import theodolite.commons.beam.kafka.EventTimePolicy;
+import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
+import theodolite.commons.beam.kafka.KafkaWriterTransformation;
+import titan.ccp.model.records.ActivePowerRecord;
+
+
+/**
+ * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
+ * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
+ * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
+ * using--flinkMaster as run parameter. To persist logs add
+ * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
+ * Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc3BeamPipeline extends AbstractPipeline {
+
+  protected Uc3BeamPipeline(final PipelineOptions options, final Configuration config) {
+    super(options, config);
+    // Additional needed variables
+    final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+
+    final int windowDurationDays = Integer.parseInt(
+        config.getString(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
+    final Duration duration = Duration.standardDays(windowDurationDays);
+
+    final int aggregationAdvance = Integer.parseInt(
+        config.getString(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
+    final Duration aggregationAdvanceDuration = Duration.standardDays(aggregationAdvance);
+
+    final int triggerInterval = Integer.parseInt(
+        config.getString(ConfigurationKeys.TRIGGER_INTERVAL));
+    final Duration triggerDelay = Duration.standardDays(aggregationAdvance);
+
+    // Build kafka configuration
+    final Properties consumerConfig = buildConsumerConfig();
+
+    // Set Coders for Classes that will be distributed
+    final CoderRegistry cr = this.getCoderRegistry();
+    registerCoders(cr);
+
+
+    // Read from Kafka
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
+        kafkaActivePowerRecordReader =
+        new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
+        KafkaIO.<String, ActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(inputTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
+                AvroCoder.of(ActivePowerRecord.class))
+            .withConsumerConfigUpdates(consumerConfig)
+            // Set TimeStampPolicy for event time
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
+            .withoutMetadata();
+
+
+    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+
+    final MapTimeFormat mapTimeFormat = new MapTimeFormat();
+
+    final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
+
+    // Write to Kafka
+    final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
+        new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
+
+    this.apply(kafka)
+        // Map to correct time format
+        .apply(MapElements.via(new MapTimeFormat()))
+        // Apply a sliding window
+        .apply(Window
+            .<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(aggregationAdvanceDuration))
+            .triggering(AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes())
+
+        // Aggregate per window for every key
+        .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
+            new StatsAggregation()))
+        .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
+
+        // Map into correct output format
+        .apply(MapElements
+            .via(hourOfDayWithStats))
+        // Write to Kafka
+        .apply(kafkaWriter);
+  }
+
+
+  /**
+   * Registers all Coders for all needed Coders.
+   * @param cr CoderRegistry.
+   */
+  private static void registerCoders(final CoderRegistry cr) {
+    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
+    cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
+    cr.registerCoderForClass(StatsAggregation.class,
+        SerializableCoder.of(StatsAggregation.class));
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+  }
+}
+
-- 
GitLab