Skip to content
Snippets Groups Projects

Migrate Flink benchmark implementation

Merged Sören Henning requested to merge flink-benchmark-migration into master
1 file
+ 9
4
Compare changes
  • Side-by-side
  • Inline
@@ -17,8 +17,9 @@ public final class StateBackends {
public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend";
// public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
@@ -35,7 +36,9 @@ public final class StateBackends {
* {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
*/
public static StateBackend fromConfiguration(final Configuration configuration) {
switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) {
final String stateBackendType =
configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
switch (stateBackendType) {
case STATE_BACKEND_TYPE_MEMORY:
final int memoryStateBackendSize = configuration.getInt(
ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
@@ -47,7 +50,6 @@ public final class StateBackends {
DEFAULT_STATE_BACKEND_PATH);
return new FsStateBackend(stateBackendPath);
case STATE_BACKEND_TYPE_ROCKSDB:
default:
final String stateBackendPath2 = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
@@ -57,6 +59,9 @@ public final class StateBackends {
LOGGER.error("Cannot create RocksDB state backend.", e);
throw new IllegalStateException(e);
}
default:
throw new IllegalArgumentException(
"Unsupported state backend '" + stateBackendType + "' configured.");
}
}
Loading