From ccf3ade94ef2a54b6c75b177946828c1b1abba3a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 11 Mar 2021 19:11:16 +0100
Subject: [PATCH] Encapsulate StateBackend factory

---
 .../commons/flink/ConfigurationKeys.java      | 19 ++++++
 .../commons/flink/StateBackends.java          | 63 +++++++++++++++++++
 .../application/HistoryServiceFlinkJob.java   | 29 ++-------
 .../application/HistoryServiceFlinkJob.java   | 26 ++------
 .../AggregationServiceFlinkJob.java           | 26 ++------
 5 files changed, 95 insertions(+), 68 deletions(-)
 create mode 100644 theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java
 create mode 100644 theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java

diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java
new file mode 100644
index 000000000..2847ede44
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java
@@ -0,0 +1,19 @@
+package theodolite.commons.flink;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String FLINK_STATE_BACKEND = "flink.state.backend";
+
+  public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
+
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
+      "flink.state.backend.memory.size";
+
+  public static final String FLINK_CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
new file mode 100644
index 000000000..67646e3bb
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
@@ -0,0 +1,63 @@
+package theodolite.commons.flink;
+
+import java.io.IOException;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides factory methods for creating Flink {@link StateBackend}s.
+ */
+public final class StateBackends {
+
+  public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
+  public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
+  public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
+  public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
+  public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
+
+  private StateBackends() {}
+
+  /**
+   * Create a Flink {@link StateBackend} from a {@link Configuration} and the
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND},
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND_MEMORY_SIZE} and
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND_PATH} configuration keys. Possible options for the
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND} configuration are
+   * {@code #STATE_BACKEND_TYPE_ROCKSDB}, {@code #STATE_BACKEND_TYPE_FILESYSTEM} and
+   * {@code StateBackendFactory#STATE_BACKEND_TYPE_MEMORY}, where
+   * {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
+   */
+  public static StateBackend fromConfiguration(final Configuration configuration) {
+    switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) {
+      case STATE_BACKEND_TYPE_MEMORY:
+        final int memoryStateBackendSize = configuration.getInt(
+            ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
+            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+        return new MemoryStateBackend(memoryStateBackendSize);
+      case STATE_BACKEND_TYPE_FILESYSTEM:
+        final String stateBackendPath = configuration.getString(
+            ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
+            DEFAULT_STATE_BACKEND_PATH);
+        return new FsStateBackend(stateBackendPath);
+      case STATE_BACKEND_TYPE_ROCKSDB:
+      default:
+        final String stateBackendPath2 = configuration.getString(
+            ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
+            DEFAULT_STATE_BACKEND_PATH);
+        try {
+          return new RocksDBStateBackend(stateBackendPath2, true);
+        } catch (final IOException e) {
+          LOGGER.error("Cannot create RocksDB state backend.", e);
+          throw new IllegalStateException(e);
+        }
+    }
+  }
+
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
index fa82973ec..b438d6513 100644
--- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -1,7 +1,6 @@
 package theodolite.uc2.application;
 
 import com.google.common.math.Stats;
-import java.io.IOException;
 import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -10,10 +9,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -24,6 +21,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.StateBackends;
 import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
 import theodolite.commons.flink.serialization.StatsSerializer;
 import titan.ccp.common.configuration.ServiceConfigurations;
@@ -49,14 +47,8 @@ public class HistoryServiceFlinkJob {
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
 
     final Properties kafkaProps = new Properties();
     kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
@@ -69,7 +61,6 @@ public class HistoryServiceFlinkJob {
 
     final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
         inputTopic, sourceSerde, kafkaProps);
-
     kafkaSource.setStartFromGroupOffsets();
     if (checkpointing) {
       kafkaSource.setCommitOffsetsOnCheckpoints(true);
@@ -81,7 +72,7 @@ public class HistoryServiceFlinkJob {
             Serdes::String,
             Serdes::String,
             TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
-    kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000);
+    kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary?
     final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
         outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
     kafkaSink.setWriteTimestampToKafka(true);
@@ -94,17 +85,7 @@ public class HistoryServiceFlinkJob {
     }
 
     // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        LOGGER.error("Cannot create RocksDB state backend.", e);
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
+    env.setStateBackend(stateBackend);
 
     env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
     env.getConfig().getRegisteredTypesWithKryoSerializers()
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index 64819070b..81d7110ac 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -1,7 +1,6 @@
 package theodolite.uc3.application;
 
 import com.google.common.math.Stats;
-import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -14,10 +13,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -27,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.StateBackends;
 import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
 import theodolite.commons.flink.serialization.StatsSerializer;
 import theodolite.uc3.application.util.HourOfDayKey;
@@ -62,13 +60,7 @@ public class HistoryServiceFlinkJob {
         Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
     final Time aggregationAdvance =
         Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
     final Properties kafkaProps = new Properties();
@@ -110,17 +102,7 @@ public class HistoryServiceFlinkJob {
     }
 
     // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        LOGGER.error("Cannot create RocksDB state backend.", e);
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
+    env.setStateBackend(stateBackend);
 
     // Kryo serializer registration
     env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
index d76c68c14..e2c5bbed4 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -1,6 +1,5 @@
 package theodolite.uc4.application;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Properties;
 import java.util.Set;
@@ -12,10 +11,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.StateBackends;
 import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
 import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer;
 import theodolite.uc4.application.util.ImmutableSetSerializer;
@@ -65,13 +63,7 @@ public class AggregationServiceFlinkJob {
         Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
     final String configurationTopic =
         this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
@@ -155,17 +147,7 @@ public class AggregationServiceFlinkJob {
     }
 
     // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        LOGGER.error("Cannot create RocksDB state backend.", e);
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
+    env.setStateBackend(stateBackend);
 
     // Kryo serializer registration
     env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
-- 
GitLab