Skip to content
Snippets Groups Projects
Commit 6e61db7a authored by Sören Henning's avatar Sören Henning
Browse files

Revert to previous default state backend

parent ccf3ade9
Branches
Tags
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2259 failed
......@@ -17,8 +17,9 @@ public final class StateBackends {
public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend";
// public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
......@@ -35,7 +36,9 @@ public final class StateBackends {
* {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
*/
public static StateBackend fromConfiguration(final Configuration configuration) {
switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) {
final String stateBackendType =
configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
switch (stateBackendType) {
case STATE_BACKEND_TYPE_MEMORY:
final int memoryStateBackendSize = configuration.getInt(
ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
......@@ -47,7 +50,6 @@ public final class StateBackends {
DEFAULT_STATE_BACKEND_PATH);
return new FsStateBackend(stateBackendPath);
case STATE_BACKEND_TYPE_ROCKSDB:
default:
final String stateBackendPath2 = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
......@@ -57,6 +59,9 @@ public final class StateBackends {
LOGGER.error("Cannot create RocksDB state backend.", e);
throw new IllegalStateException(e);
}
default:
throw new IllegalArgumentException(
"Unsupported state backend '" + stateBackendType + "' configured.");
}
}
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment