From 57ba13e7c49fdc3c7823ccd5a6987b167b2a2c3e Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 19 Nov 2021 11:17:07 +0100
Subject: [PATCH] Add uc2-beam-samza

---
 theodolite-benchmarks/settings.gradle         |  2 +-
 .../main/java/application/Uc2BeamFlink.java   | 13 +-----
 .../uc2-beam-samza/Dockerfile                 |  5 +++
 .../uc2-beam-samza/build.gradle               | 11 +++++
 .../main/java/application/Uc2BeamSamza.java   | 40 +++++++++++++++++++
 5 files changed, 59 insertions(+), 12 deletions(-)
 create mode 100644 theodolite-benchmarks/uc2-beam-samza/Dockerfile
 create mode 100644 theodolite-benchmarks/uc2-beam-samza/build.gradle
 create mode 100644 theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java

diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index 672b76a17..afbb472cf 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -14,11 +14,11 @@ include 'uc1-flink'
 include 'uc1-beam-samza'
 include 'uc1-beam-flink'
 
-
 include 'uc2-load-generator'
 include 'uc2-kstreams'
 include 'uc2-flink'
 include 'uc2-beam-flink'
+include 'uc2-beam-samza'
 
 include 'uc3-load-generator'
 include 'uc3-kstreams'
diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
index e777b7be3..05728cc19 100644
--- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
+++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java
@@ -11,15 +11,6 @@ import theodolite.commons.beam.AbstractBeamService;
  * using--flinkMaster as run parameter.
  */
 public final class Uc2BeamFlink extends AbstractBeamService {
-  private static final String JOB_NAME = "Uc2Application";
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String OUTPUT = "OUTPUT";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
-  private static final String KAFKA_WINDOW_DURATION_MINUTES = "KAFKA_WINDOW_DURATION_MINUTES";
 
   /**
    * Private constructor setting specific options for this use case.
@@ -35,9 +26,9 @@ public final class Uc2BeamFlink extends AbstractBeamService {
   @SuppressWarnings({"serial", "unchecked", "rawtypes"})
   public static void main(final String[] args) {
 
-    Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args);
+    final Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args);
 
-    Pipeline pipeline = new Uc2BeamPipeline(uc2BeamFlink.options, uc2BeamFlink.getConfig());
+    final Pipeline pipeline = new Uc2BeamPipeline(uc2BeamFlink.options, uc2BeamFlink.getConfig());
 
     pipeline.run().waitUntilFinish();
   }
diff --git a/theodolite-benchmarks/uc2-beam-samza/Dockerfile b/theodolite-benchmarks/uc2-beam-samza/Dockerfile
new file mode 100644
index 000000000..baee343ad
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam-samza/Dockerfile
@@ -0,0 +1,5 @@
+FROM openjdk:8-slim
+
+ADD build/distributions/uc3-application-flink.tar /
+
+CMD /uc3-application-flink/bin/uc3-application-flink --runner=FlinkRunner --flinkMaster=flink-jobmanager:8081 --streaming --parallelism=$PARALLELISM  --disableMetrics=true --fasterCopy
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc2-beam-samza/build.gradle b/theodolite-benchmarks/uc2-beam-samza/build.gradle
new file mode 100644
index 000000000..2da9a14ff
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam-samza/build.gradle
@@ -0,0 +1,11 @@
+plugins {
+  id 'theodolite.beam'
+}
+
+
+dependencies {
+  compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
+  compile project(':uc2-beam')
+}
+
+mainClassName = "application.Uc2BeamSamza"
diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java
new file mode 100644
index 000000000..aa18729a8
--- /dev/null
+++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java
@@ -0,0 +1,40 @@
+package application;
+
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import theodolite.commons.beam.AbstractBeamService;
+
+/**
+ * Implementation of the use case Downsampling using Apache Beam with the Samza Runner. To run
+ * locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator
+ * using the delayed_startup.sh script. Add
+ * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
+ * --configFilePath=${workspace_loc:uc3-application-samza}/config/standalone_local.properties
+ * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
+ * persist logs add ${workspace_loc:/uc3-application-samza/eclipseConsoleLogs.log} as Output File
+ * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc2BeamSamza extends AbstractBeamService {
+
+  /**
+   * Private constructor setting specific options for this use case.
+   */
+  private Uc2BeamSamza(final String[] args) { //NOPMD
+    super(args);
+    this.options.setRunner(SamzaRunner.class);
+  }
+
+  /**
+   * Start running this microservice.
+   */
+  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
+  public static void main(final String[] args) {
+
+    final Uc2BeamSamza uc2BeamSamza = new Uc2BeamSamza(args);
+
+    final Pipeline pipeline = new Uc2BeamPipeline(uc2BeamSamza.options, uc2BeamSamza.getConfig());
+
+    pipeline.run().waitUntilFinish();
+  }
+}
+
-- 
GitLab