diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java index 67646e3bb1c067d86dc9daae3d1f0d0f5d5bb677..a94927e4bf49e1dbe6d109eb8f19f7d292f3d879 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java @@ -17,8 +17,9 @@ public final class StateBackends { public static final String STATE_BACKEND_TYPE_MEMORY = "memory"; public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem"; public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb"; - public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB; - public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend"; + // public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB; + public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY; + public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend"; private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class); @@ -35,7 +36,9 @@ public final class StateBackends { * {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default. */ public static StateBackend fromConfiguration(final Configuration configuration) { - switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) { + final String stateBackendType = + configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT); + switch (stateBackendType) { case STATE_BACKEND_TYPE_MEMORY: final int memoryStateBackendSize = configuration.getInt( ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, @@ -47,7 +50,6 @@ public final class StateBackends { DEFAULT_STATE_BACKEND_PATH); return new FsStateBackend(stateBackendPath); case STATE_BACKEND_TYPE_ROCKSDB: - default: final String stateBackendPath2 = configuration.getString( ConfigurationKeys.FLINK_STATE_BACKEND_PATH, DEFAULT_STATE_BACKEND_PATH); @@ -57,6 +59,9 @@ public final class StateBackends { LOGGER.error("Cannot create RocksDB state backend.", e); throw new IllegalStateException(e); } + default: + throw new IllegalArgumentException( + "Unsupported state backend '" + stateBackendType + "' configured."); } }