From cab6297c62a5d21d4df4d639f723bf4c4c3de4eb Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 12 May 2022 12:19:00 +0200
Subject: [PATCH] Refactor the AbstractFlinkService

+ Move buildPipeline() and configureEnv() to run method.
+ Extend Javadoc
+ Add constructor which can override the configuration as needed for integration tests
+ remove now unnecessary static from uc1 flink
+ Fix uc2 and uc4 broken build.gradle (ref to uc1/uc2)
---
 .../commons/flink/AbstractFlinkService.java   | 24 +++++++++++++++----
 .../uc1/flink/HistoryServiceFlinkJob.java     |  6 ++---
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java
index ac4e30352..ff75b6c9d 100644
--- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java
+++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java
@@ -9,19 +9,20 @@ import titan.ccp.common.configuration.ServiceConfigurations;
 
 /**
  * A general Apache Flink-based microservice. It is configured by {@link #configureEnv()},
- * and extended by implementing business logic in {@link #buildPipeline()}
+ * and extended by implementing business logic in {@link #buildPipeline()}.
+ * The configuration of the serializer needs to be implemented in {@link #configureSerializers()}.
  */
 public abstract class AbstractFlinkService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class);
   protected final StreamExecutionEnvironment env;
 
-  protected final Configuration config = ServiceConfigurations.createWithDefaults();
+  protected Configuration config = ServiceConfigurations.createWithDefaults();
 
   protected final String applicationId;
 
   /**
-   * Abstract Service constructing and configuring the application.
+   * Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
    */
   public AbstractFlinkService() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
@@ -30,11 +31,22 @@ public abstract class AbstractFlinkService {
 
     this.env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-    this.configureEnv(); //NOPMD
+  }
 
-    this.buildPipeline();
+  /**
+   *  Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
+   * @param config the configuration for the service.
+   */
+  public AbstractFlinkService(final Configuration config) {
+    this.config = config;
+    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    this.applicationId = applicationName + "-" + applicationVersion;
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
   }
 
+
+
   /**
    * Configures the service using environment variables.
    */
@@ -86,6 +98,8 @@ public abstract class AbstractFlinkService {
    *  Starts the service.
    */
   public void run() {
+    this.configureEnv();
+    this.buildPipeline();
     LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
 
     try {
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
index c1b3278c3..a5e833b0b 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
@@ -14,7 +14,7 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public final class HistoryServiceFlinkJob extends AbstractFlinkService {
 
-  private static final DatabaseAdapter<String> DATABASE_ADAPTER = LogWriterFactory.forJson();
+  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
 
   @Override
   public void configureEnv() {
@@ -45,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
 
     stream
         // .rebalance()
-        .map(new ConverterAdapter<>(this.DATABASE_ADAPTER.getRecordConverter()))
+        .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
         .returns(Types.STRING)
-        .flatMap(new WriterAdapter<>(this.DATABASE_ADAPTER.getDatabaseWriter()))
+        .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
         .returns(Types.VOID); // Will never be used
   }
 
-- 
GitLab