From e275c86fde404b7a72f673e86b52fe5ee4996654 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 17 Nov 2021 17:43:33 +0100
Subject: [PATCH] Restuctured UC1 Beam Samza

---
 .../uc1-beam-samza/build.gradle               |  25 +--
 .../java/application/Uc1ApplicationBeam.java  | 148 ------------------
 .../main/java/application/Uc1BeamSamza.java   |  46 ++++++
 .../resources/META-INF/application.properties |  16 ++
 4 files changed, 65 insertions(+), 170 deletions(-)
 delete mode 100644 theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java
 create mode 100644 theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java
 create mode 100644 theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties

diff --git a/theodolite-benchmarks/uc1-beam-samza/build.gradle b/theodolite-benchmarks/uc1-beam-samza/build.gradle
index 906023d42..18eec3bd7 100644
--- a/theodolite-benchmarks/uc1-beam-samza/build.gradle
+++ b/theodolite-benchmarks/uc1-beam-samza/build.gradle
@@ -1,30 +1,11 @@
 plugins {
-  id 'theodolite.kstreams'
+  id 'theodolite.beam'
 }
 
-allprojects {
-  repositories {
-    maven {
-      url 'https://packages.confluent.io/maven/'
-    }
-    mavenCentral()
-  }
-}
-
-
 dependencies {
-  compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
   compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
-
-  compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
-    exclude group: 'org.apache.kafka', module: 'kafka-clients'
-  }
-  compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
-
-  runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
-  runtime 'org.slf4j:slf4j-api:1.7.32'
-  runtime 'org.slf4j:slf4j-jdk14:1.7.32'
+  compile project(':uc1-beam')
 }
 
 
-mainClassName = "application.Uc1ApplicationBeam"
+mainClassName = "application.Uc1BeamSamza"
diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java
deleted file mode 100644
index 7434719a3..000000000
--- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package application;
-
-import com.google.gson.Gson;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.HashMap;
-import org.apache.beam.runners.samza.SamzaRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Implementation of the use case Database Storage using Apache Beam with the Samza Runner. To
- * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
- * generator using the delayed_startup.sh script. Add
- * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
- * --configFilePath=${workspace_loc:uc1-application-samza}/config/standalone_local.properties
- * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
- * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
- * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
- */
-public final class Uc1ApplicationBeam {
-  private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
-
-  /**
-  * Private constructor to avoid instantiation.
-  */
-  private Uc1ApplicationBeam() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-  * Main method.
-  *
-  */
-  @SuppressWarnings({"unchecked", "rawtypes","unused"})
-  public static void main(final String[] args) {
-
-    // Set Configuration for Kafka
-    final String bootstrapServer =
-        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
-            : System.getenv(BOOTSTRAP);
-    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
-    final String schemaRegistryUrl =
-        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
-            : System.getenv(SCHEMA_REGISTRY);
-    // Set consumer configuration for the schema registry and commits back to Kafka
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
-    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
-
-    // Create Pipeline Options from args. Current Execution Parameters for local execution are:
-    // --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
-    // --configFilePath=${workspace_loc:uc1-application-samza}/config/standalone_local.properties
-    // --samzaExecutionEnvironment=STANDALONE
-    // --maxSourceParallelism=1024
-
-    final LogKeyValue logKeyValue = new LogKeyValue();
-
-    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setJobName("ucapplication");
-    options.setRunner(SamzaRunner.class);
-
-    final Pipeline pipeline = Pipeline.create(options);
-
-    final CoderRegistry cr = pipeline.getCoderRegistry();
-
-    // Set Coders for Classes that will be distributed
-    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
-
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            .withoutMetadata();
-
-    // Apply pipeline transformations
-    // Read from Kafka
-    pipeline.apply(kafka)
-        // Map to Gson
-        .apply(MapElements
-            .via(
-                new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() {
-                  private transient Gson gsonObj = new Gson();
-
-                  @Override
-                  public KV<String, String> apply(
-                      final KV<String, ActivePowerRecord> kv) {
-                    if (this.gsonObj == null) {
-                      this.gsonObj = new Gson();
-                    }
-                    final String gson = this.gsonObj.toJson(kv.getValue());
-                    return KV.of(kv.getKey(), gson);
-                  }
-                }))
-        // Print to console
-        .apply(ParDo.of(logKeyValue));
-    // Start execution
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * Logs all Logs all Key Value pairs..
-   */
-  @SuppressWarnings({"unused"})
-  private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> {
-    private static final long serialVersionUID = 4328743;
-
-    @ProcessElement
-    public void processElement(@Element final KV<String, String> kv,
-                               final OutputReceiver<KV<String, String>> out) {
-      if (LOGGER.isInfoEnabled()) {
-        LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue());
-      }
-    }
-  }
-}
-
-
-
diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java
new file mode 100644
index 000000000..75bedead7
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java
@@ -0,0 +1,46 @@
+package application;
+
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.beam.AbstractBeamService;
+
+/**
+ * Implementation of the use case Database Storage using Apache Beam with the Samza Runner. To
+ * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
+ * generator using the delayed_startup.sh script. Add
+ * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
+ * --configFilePath=${workspace_loc:uc1-application-samza}/config/standalone_local.properties
+ * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
+ * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
+ * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc1BeamSamza extends AbstractBeamService {
+
+  /**
+   * Private constructor setting specific options for this use case.
+   */
+  private Uc1BeamSamza(final String[] args) { //NOPMD
+    super(args);
+    this.options.setRunner(SamzaRunner.class);
+  }
+
+  /**
+   * Main method.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes", "unused"})
+  public static void main(final String[] args) {
+
+    // Create application via configurations
+    final Uc1BeamSamza uc1 = new Uc1BeamSamza(args);
+
+    // Create pipeline with configurations
+    Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig());
+
+    // Submit job and start execution
+    pipeline.run().waitUntilFinish();
+  }
+}
+
+
+
diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..50db1510a
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties
@@ -0,0 +1,16 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+
+schema.registry.url=http://localhost:8081
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
+
+specific.avro.reader=True
+enable.auto.commit.config=True
+auto.offset.reset.config=earliest
\ No newline at end of file
-- 
GitLab