Skip to content
Snippets Groups Projects
Commit fcc123c7 authored by Sören Henning's avatar Sören Henning
Browse files

Refactor UC3

parent d26e223e
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2291 passed
......@@ -31,32 +31,72 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public class HistoryServiceFlinkJob {
public final class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private void run() {
// Configurations
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
new HourOfDayKeySerde());
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
for (final var entry : this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.entrySet()) {
LOGGER.info("Class {} registered with serializer {}.",
entry.getKey().getName(),
entry.getValue().getSerializer().getClass().getName());
}
}
private void buildPipeline() {
// Configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE));
final Time aggregationDuration =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
// Sources and Sinks
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
......@@ -67,29 +107,9 @@ public class HistoryServiceFlinkJob {
Serdes::String,
Types.TUPLE(Types.STRING, Types.STRING));
// Execution environment configuration
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
// State Backend
env.setStateBackend(stateBackend);
// Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
for (final var entry : env.getConfig().getRegisteredTypesWithKryoSerializers().entrySet()) {
LOGGER.info("Class {} registered with serializer {}.",
entry.getKey().getName(),
entry.getValue().getSerializer().getClass().getName());
}
// Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
env
this.env
.addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
......@@ -110,13 +130,18 @@ public class HistoryServiceFlinkJob {
.name("map")
.returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
}
/**
* Start running this microservice.
*/
public void run() {
// Execution plan
LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
// Execute Job
try {
env.execute(applicationId);
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment