From ebf1330a38368b77cf6d45d434c08732f0d5a1bf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Sat, 19 Feb 2022 19:57:43 +0100
Subject: [PATCH] Draft for new service and pipeline factory

---
 .../application/AbstractPipelineFactory.java  | 37 ++++++++++++++++++
 .../main/java/application/BeamService.java    | 39 +++++++++++++++++++
 2 files changed, 76 insertions(+)
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java

diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java
new file mode 100644
index 000000000..bf2d1039b
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java
@@ -0,0 +1,37 @@
+package application;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.commons.configuration2.Configuration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public abstract class AbstractPipelineFactory {
+
+  protected final Configuration configuration;
+
+  public AbstractPipelineFactory(final Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public final Pipeline create(final PipelineOptions options) {
+    final Pipeline pipeline = Pipeline.create(options);
+    this.constructPipeline(pipeline);
+    this.registerCoders(pipeline.getCoderRegistry());
+    return pipeline;
+  }
+
+  private void constructPipeline(Pipeline pipeline) {
+    // pipeline.apply(kafka)
+    // .apply(Values.create())
+    // .apply(sinkType.create(config));
+  }
+
+  private void registerCoders(CoderRegistry registry) {
+    registry.registerCoderForClass(
+        ActivePowerRecord.class,
+        AvroCoder.of(ActivePowerRecord.SCHEMA$));
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java
new file mode 100644
index 000000000..02bc29d88
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java
@@ -0,0 +1,39 @@
+package application;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.commons.configuration2.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.beam.ConfigurationKeys;
+import titan.ccp.common.configuration.ServiceConfigurations;
+
+public class BeamService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class);
+
+  private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+
+  private final AbstractPipelineFactory pipelineFactory;
+  private final PipelineOptions pipelineOptions;
+
+  public BeamService(
+      AbstractPipelineFactory pipelineFactory,
+      Class<? extends PipelineRunner<?>> runner,
+      String[] args) {
+    this.pipelineFactory = pipelineFactory;
+    this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create();
+    this.pipelineOptions.setJobName(this.applicationName);
+    this.pipelineOptions.setRunner(runner);
+  }
+
+  public void run() {
+    LOGGER.info("Starting BeamService with pipeline options: {}", this.pipelineOptions.toString());
+    final Pipeline pipeline = this.pipelineFactory.create(this.config, this.pipelineOptions);
+    pipeline.run().waitUntilFinish();
+  }
+
+}
-- 
GitLab