Skip to content
Snippets Groups Projects
Commit ccf3ade9 authored by Sören Henning's avatar Sören Henning
Browse files

Encapsulate StateBackend factory

parent df36a2f7
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2258 failed
package theodolite.commons.flink;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String FLINK_CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.commons.flink;
import java.io.IOException;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides factory methods for creating Flink {@link StateBackend}s.
*/
public final class StateBackends {
public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String DEFAULT_STATE_BACKEND_PATH = "/opt/flink/statebackend";
private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
private StateBackends() {}
/**
* Create a Flink {@link StateBackend} from a {@link Configuration} and the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND},
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_MEMORY_SIZE} and
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_PATH} configuration keys. Possible options for the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND} configuration are
* {@code #STATE_BACKEND_TYPE_ROCKSDB}, {@code #STATE_BACKEND_TYPE_FILESYSTEM} and
* {@code StateBackendFactory#STATE_BACKEND_TYPE_MEMORY}, where
* {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
*/
public static StateBackend fromConfiguration(final Configuration configuration) {
switch (configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND)) {
case STATE_BACKEND_TYPE_MEMORY:
final int memoryStateBackendSize = configuration.getInt(
ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
return new MemoryStateBackend(memoryStateBackendSize);
case STATE_BACKEND_TYPE_FILESYSTEM:
final String stateBackendPath = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
return new FsStateBackend(stateBackendPath);
case STATE_BACKEND_TYPE_ROCKSDB:
default:
final String stateBackendPath2 = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
try {
return new RocksDBStateBackend(stateBackendPath2, true);
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
throw new IllegalStateException(e);
}
}
}
}
package theodolite.uc2.application; package theodolite.uc2.application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.io.IOException;
import java.util.Properties; import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...@@ -10,10 +9,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint; ...@@ -10,10 +9,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
...@@ -24,6 +21,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; ...@@ -24,6 +21,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.commons.flink.serialization.StatsSerializer;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
...@@ -49,14 +47,8 @@ public class HistoryServiceFlinkJob { ...@@ -49,14 +47,8 @@ public class HistoryServiceFlinkJob {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final String stateBackend =
this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config
.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize =
this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final Properties kafkaProps = new Properties(); final Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
...@@ -69,7 +61,6 @@ public class HistoryServiceFlinkJob { ...@@ -69,7 +61,6 @@ public class HistoryServiceFlinkJob {
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
inputTopic, sourceSerde, kafkaProps); inputTopic, sourceSerde, kafkaProps);
kafkaSource.setStartFromGroupOffsets(); kafkaSource.setStartFromGroupOffsets();
if (checkpointing) { if (checkpointing) {
kafkaSource.setCommitOffsetsOnCheckpoints(true); kafkaSource.setCommitOffsetsOnCheckpoints(true);
...@@ -81,7 +72,7 @@ public class HistoryServiceFlinkJob { ...@@ -81,7 +72,7 @@ public class HistoryServiceFlinkJob {
Serdes::String, Serdes::String,
Serdes::String, Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary?
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaSink.setWriteTimestampToKafka(true); kafkaSink.setWriteTimestampToKafka(true);
...@@ -94,17 +85,7 @@ public class HistoryServiceFlinkJob { ...@@ -94,17 +85,7 @@ public class HistoryServiceFlinkJob {
} }
// State Backend // State Backend
if (stateBackend.equals("filesystem")) { env.setStateBackend(stateBackend);
env.setStateBackend(new FsStateBackend(stateBackendPath));
} else if (stateBackend.equals("rocksdb")) {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
}
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers() env.getConfig().getRegisteredTypesWithKryoSerializers()
......
package theodolite.uc3.application; package theodolite.uc3.application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
...@@ -14,10 +13,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; ...@@ -14,10 +13,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
...@@ -27,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -27,6 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.commons.flink.serialization.StatsSerializer;
import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKey;
...@@ -62,13 +60,7 @@ public class HistoryServiceFlinkJob { ...@@ -62,13 +60,7 @@ public class HistoryServiceFlinkJob {
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance = final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final String stateBackend = final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config
.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize =
this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties(); final Properties kafkaProps = new Properties();
...@@ -110,17 +102,7 @@ public class HistoryServiceFlinkJob { ...@@ -110,17 +102,7 @@ public class HistoryServiceFlinkJob {
} }
// State Backend // State Backend
if (stateBackend.equals("filesystem")) { env.setStateBackend(stateBackend);
env.setStateBackend(new FsStateBackend(stateBackendPath));
} else if (stateBackend.equals("rocksdb")) {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
}
// Kryo serializer registration // Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
......
package theodolite.uc4.application; package theodolite.uc4.application;
import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
...@@ -12,10 +11,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; ...@@ -12,10 +11,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -26,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -26,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer;
import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.ImmutableSetSerializer;
...@@ -65,13 +63,7 @@ public class AggregationServiceFlinkJob { ...@@ -65,13 +63,7 @@ public class AggregationServiceFlinkJob {
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
final String configurationTopic = final String configurationTopic =
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final String stateBackend = final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config
.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize =
this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true); final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
...@@ -155,17 +147,7 @@ public class AggregationServiceFlinkJob { ...@@ -155,17 +147,7 @@ public class AggregationServiceFlinkJob {
} }
// State Backend // State Backend
if (stateBackend.equals("filesystem")) { env.setStateBackend(stateBackend);
env.setStateBackend(new FsStateBackend(stateBackendPath));
} else if (stateBackend.equals("rocksdb")) {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
}
// Kryo serializer registration // Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment