From 6e61db7a75132b290299d24c0a8332f0a44bc172 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 11 Mar 2021 19:47:08 +0100
Subject: [PATCH] Revert to previous default state backend

---
 .../theodolite/commons/flink/StateBackends.java     | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
index 67646e3bb..a94927e4b 100644
--- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
@@ -17,8 +17,9 @@ public final class StateBackends {
   public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
   public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
   public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
-  public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
-  public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend";
+  // public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
+  public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
+  public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
 
@@ -35,7 +36,9 @@ public final class StateBackends {
    * {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
    */
   public static StateBackend fromConfiguration(final Configuration configuration) {
-    switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) {
+    final String stateBackendType =
+        configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
+    switch (stateBackendType) {
       case STATE_BACKEND_TYPE_MEMORY:
         final int memoryStateBackendSize = configuration.getInt(
             ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
@@ -47,7 +50,6 @@ public final class StateBackends {
             DEFAULT_STATE_BACKEND_PATH);
         return new FsStateBackend(stateBackendPath);
       case STATE_BACKEND_TYPE_ROCKSDB:
-      default:
         final String stateBackendPath2 = configuration.getString(
             ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
             DEFAULT_STATE_BACKEND_PATH);
@@ -57,6 +59,9 @@ public final class StateBackends {
           LOGGER.error("Cannot create RocksDB state backend.", e);
           throw new IllegalStateException(e);
         }
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported state backend '" + stateBackendType + "' configured.");
     }
   }
 
-- 
GitLab