From 4513127c8385ef7057b60122b063ee379abb92de Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Mon, 9 May 2022 20:12:16 +0200
Subject: [PATCH] Refactor flink to use AbstractFlinkService

---
 .../uc1/flink/HistoryServiceFlinkJob.java     | 54 +++------------
 .../uc2/flink/HistoryServiceFlinkJob.java     | 61 ++---------------
 .../uc3/flink/HistoryServiceFlinkJob.java     | 62 ++---------------
 .../uc4/flink/AggregationServiceFlinkJob.java | 68 ++-----------------
 4 files changed, 26 insertions(+), 219 deletions(-)

diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
index d674effac..04f14017b 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java
@@ -1,63 +1,40 @@
 package rocks.theodolite.benchmarks.uc1.flink;
 
-import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
 import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
 import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
 import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
-import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * The History microservice implemented as a Flink job.
  */
-public final class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob extends AbstractFlinkService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
-
-  private final Configuration config = ServiceConfigurations.createWithDefaults();
-  private final StreamExecutionEnvironment env;
-  private final String applicationId;
-
-  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+  private static final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
 
   /**
    * Create a new instance of the {@link HistoryServiceFlinkJob}.
    */
   public HistoryServiceFlinkJob() {
-    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
-    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    this.applicationId = applicationName + "-" + applicationVersion;
-
-    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    this.configureEnv();
-
-    this.buildPipeline();
+    super();
   }
 
-  private void configureEnv() {
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    if (checkpointing) {
-      this.env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // Parallelism
-    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
-    if (parallelism != null) {
-      LOGGER.info("Set parallelism: {}.", parallelism);
-      this.env.setParallelism(parallelism);
-    }
+  public void configureEnv() {
+    super.configureCheckpointing();
+    super.configureParallelism();
+  }
 
+  protected void configureSerializers() {
   }
 
-  private void buildPipeline() {
+  protected void buildPipeline() {
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
@@ -79,17 +56,6 @@ public final class HistoryServiceFlinkJob {
         .returns(Types.VOID); // Will never be used
   }
 
-  /**
-   * Start running this microservice.
-   */
-  public void run() {
-    try {
-      this.env.execute(this.applicationId);
-    } catch (final Exception e) { // NOPMD Execution thrown by Flink
-      LOGGER.error("An error occured while running this job.", e);
-    }
-  }
-
   public static void main(final String[] args) {
     new HistoryServiceFlinkJob().run();
   }
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
index 7e67be897..8b8d00ff0 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
@@ -1,11 +1,8 @@
 package rocks.theodolite.benchmarks.uc2.flink;
 
 import com.google.common.math.Stats;
-import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -13,61 +10,28 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
 import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
-import rocks.theodolite.benchmarks.commons.flink.StateBackends;
 import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
-import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
 
 
 /**
  * The History microservice implemented as a Flink job.
  */
-public final class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob extends AbstractFlinkService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
-  private final Configuration config = ServiceConfigurations.createWithDefaults();
-  private final StreamExecutionEnvironment env;
-  private final String applicationId;
 
   /**
    * Create a new instance of the {@link HistoryServiceFlinkJob}.
    */
   public HistoryServiceFlinkJob() {
-    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
-    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    this.applicationId = applicationName + "-" + applicationVersion;
-
-    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    this.configureEnv();
-
-    this.buildPipeline();
-  }
-
-  private void configureEnv() {
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    if (checkpointing) {
-      this.env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // Parallelism
-    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
-    if (parallelism != null) {
-      LOGGER.info("Set parallelism: {}.", parallelism);
-      this.env.setParallelism(parallelism);
-    }
-
-    // State Backend
-    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
-    this.env.setStateBackend(stateBackend);
-
-    this.configureSerializers();
+    super();
   }
 
-  private void configureSerializers() {
+  protected void configureSerializers() {
     this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
     this.env.getConfig().getRegisteredTypesWithKryoSerializers()
         .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
@@ -75,7 +39,8 @@ public final class HistoryServiceFlinkJob {
 
   }
 
-  private void buildPipeline() {
+
+  protected void buildPipeline() {
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
@@ -112,20 +77,6 @@ public final class HistoryServiceFlinkJob {
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
   }
 
-
-  /**
-   * Start running this microservice.
-   */
-  public void run() {
-    LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
-
-    try {
-      this.env.execute(this.applicationId);
-    } catch (final Exception e) { // NOPMD Execution thrown by Flink
-      LOGGER.error("An error occured while running this job.", e);
-    }
-  }
-
   public static void main(final String[] args) {
     new HistoryServiceFlinkJob().run();
   }
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
index 4cf7ed080..e0c25c83f 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
@@ -4,12 +4,9 @@ import com.google.common.math.Stats;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -17,64 +14,30 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
 import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
-import rocks.theodolite.benchmarks.commons.flink.StateBackends;
 import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
 import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKey;
 import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKeyFactory;
 import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKeySerde;
 import rocks.theodolite.benchmarks.uc3.flink.util.StatsKeyFactory;
-import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * The History microservice implemented as a Flink job.
  */
-public final class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob extends AbstractFlinkService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
-  private final Configuration config = ServiceConfigurations.createWithDefaults();
-  private final StreamExecutionEnvironment env;
-  private final String applicationId;
-
   /**
    * Create a new instance of the {@link HistoryServiceFlinkJob}.
    */
   public HistoryServiceFlinkJob() {
-    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
-    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    this.applicationId = applicationName + "-" + applicationVersion;
-
-    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    this.configureEnv();
-
-    this.buildPipeline();
-  }
-
-  private void configureEnv() {
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    if (checkpointing) {
-      this.env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // Parallelism
-    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
-    if (parallelism != null) {
-      LOGGER.error("Set parallelism: {}.", parallelism);
-      this.env.setParallelism(parallelism);
-    }
-
-    // State Backend
-    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
-    this.env.setStateBackend(stateBackend);
-
-    this.configureSerializers();
+    super();
   }
 
-  private void configureSerializers() {
+  protected void configureSerializers() {
     this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
         new HourOfDayKeySerde());
     this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
@@ -86,7 +49,7 @@ public final class HistoryServiceFlinkJob {
     }
   }
 
-  private void buildPipeline() {
+  protected void buildPipeline() {
     // Configurations
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
@@ -135,21 +98,6 @@ public final class HistoryServiceFlinkJob {
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
   }
 
-  /**
-   * Start running this microservice.
-   */
-  public void run() {
-    // Execution plan
-    LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
-
-    // Execute Job
-    try {
-      this.env.execute(this.applicationId);
-    } catch (final Exception e) { // NOPMD Execution thrown by Flink
-      LOGGER.error("An error occured while running this job.", e);
-    }
-  }
-
   public static void main(final String[] args) {
     new HistoryServiceFlinkJob().run();
   }
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
index 726a7f590..583ff36a6 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
@@ -2,15 +2,12 @@ package rocks.theodolite.benchmarks.uc4.flink; // NOPMD Imports required
 
 import java.time.Duration;
 import java.util.Set;
-import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -19,14 +16,13 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
 import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
-import rocks.theodolite.benchmarks.commons.flink.StateBackends;
 import rocks.theodolite.benchmarks.commons.flink.TupleType;
 import rocks.theodolite.benchmarks.uc4.flink.util.ImmutableSensorRegistrySerializer;
 import rocks.theodolite.benchmarks.uc4.flink.util.ImmutableSetSerializer;
 import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKey;
 import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKeySerializer;
-import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.configuration.events.EventSerde;
@@ -38,57 +34,18 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
 /**
  * The Aggregation microservice implemented as a Flink job.
  */
-public final class AggregationServiceFlinkJob {
+public final class AggregationServiceFlinkJob extends AbstractFlinkService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
 
-  private final Configuration config = ServiceConfigurations.createWithDefaults();
-  private final StreamExecutionEnvironment env;
-  private final String applicationId;
-
   /**
    * Create a new {@link AggregationServiceFlinkJob}.
    */
   public AggregationServiceFlinkJob() {
-    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
-    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    this.applicationId = applicationName + "-" + applicationVersion;
-
-    // Execution environment configuration
-    // org.apache.flink.configuration.Configuration conf = new
-    // org.apache.flink.configuration.Configuration();
-    // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-    // final StreamExecutionEnvironment env =
-    // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
-    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    this.configureEnv();
-
-    this.buildPipeline();
+    super();
   }
 
-  private void configureEnv() {
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    if (checkpointing) {
-      this.env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // Parallelism
-    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
-    if (parallelism != null) {
-      LOGGER.info("Set parallelism: {}.", parallelism);
-      this.env.setParallelism(parallelism);
-    }
-
-    // State Backend
-    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
-    this.env.setStateBackend(stateBackend);
-
-    this.configureSerializers();
-  }
-
-  private void configureSerializers() {
+  protected void configureSerializers() {
     this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
         new ImmutableSensorRegistrySerializer());
     this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
@@ -108,7 +65,7 @@ public final class AggregationServiceFlinkJob {
             s.getSerializer().getClass().getName()));
   }
 
-  private void buildPipeline() {
+  protected void buildPipeline() {
     // Get configurations
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
@@ -204,21 +161,6 @@ public final class AggregationServiceFlinkJob {
         .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
   }
 
-  /**
-   * Start running this microservice.
-   */
-  public void run() {
-    // Execution plan
-    LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
-
-    // Execute Job
-    try {
-      this.env.execute(this.applicationId);
-    } catch (final Exception e) { // NOPMD Execution thrown by Flink
-      LOGGER.error("An error occured while running this job.", e);
-    }
-  }
-
   public static void main(final String[] args) {
     new AggregationServiceFlinkJob().run();
   }
-- 
GitLab