From fcc123c7c5ef3bdb45d432f247c359035ed25a57 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 12 Mar 2021 19:28:16 +0100
Subject: [PATCH] Refactor UC3

---
 .../application/HistoryServiceFlinkJob.java   | 85 ++++++++++++-------
 1 file changed, 55 insertions(+), 30 deletions(-)

diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index cd0000121..67af5bf6e 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -31,32 +31,72 @@ import titan.ccp.model.records.ActivePowerRecord;
 /**
  * The History microservice implemented as a Flink job.
  */
-public class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
   private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final StreamExecutionEnvironment env;
+  private final String applicationId;
 
-  private void run() {
-    // Configurations
+  /**
+   * Create a new instance of the {@link HistoryServiceFlinkJob}.
+   */
+  public HistoryServiceFlinkJob() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
     final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    final String applicationId = applicationName + "-" + applicationVersion;
+    this.applicationId = applicationName + "-" + applicationVersion;
+
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    this.configureEnv();
+
+    this.buildPipeline();
+  }
+
+  private void configureEnv() {
+    this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
     final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    if (checkpointing) {
+      this.env.enableCheckpointing(commitIntervalMs);
+    }
+
+    // State Backend
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
+    this.env.setStateBackend(stateBackend);
+
+    this.configureSerializers();
+  }
+
+  private void configureSerializers() {
+    this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
+        new HourOfDayKeySerde());
+    this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+    for (final var entry : this.env.getConfig().getRegisteredTypesWithKryoSerializers()
+        .entrySet()) {
+      LOGGER.info("Class {} registered with serializer {}.",
+          entry.getKey().getName(),
+          entry.getValue().getSerializer().getClass().getName());
+    }
+  }
+
+  private void buildPipeline() {
+    // Configurations
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE));
     final Time aggregationDuration =
         Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
     final Time aggregationAdvance =
         Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
-    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
     final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
-        applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
+        this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
 
     // Sources and Sinks
     final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
@@ -67,29 +107,9 @@ public class HistoryServiceFlinkJob {
             Serdes::String,
             Types.TUPLE(Types.STRING, Types.STRING));
 
-    // Execution environment configuration
-    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-    if (checkpointing) {
-      env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // State Backend
-    env.setStateBackend(stateBackend);
-
-    // Kryo serializer registration
-    env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
-    env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
-    for (final var entry : env.getConfig().getRegisteredTypesWithKryoSerializers().entrySet()) {
-      LOGGER.info("Class {} registered with serializer {}.",
-          entry.getKey().getName(),
-          entry.getValue().getSerializer().getClass().getName());
-    }
-
     // Streaming topology
     final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
-    env
+    this.env
         .addSource(kafkaSource)
         .name("[Kafka Consumer] Topic: " + inputTopic)
         .rebalance()
@@ -110,13 +130,18 @@ public class HistoryServiceFlinkJob {
         .name("map")
         .returns(Types.TUPLE(Types.STRING, Types.STRING))
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
+  }
 
+  /**
+   * Start running this microservice.
+   */
+  public void run() {
     // Execution plan
-    LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
+    LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
 
     // Execute Job
     try {
-      env.execute(applicationId);
+      this.env.execute(this.applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
       LOGGER.error("An error occured while running this job.", e);
     }
-- 
GitLab