Skip to content
Snippets Groups Projects
Commit 689573aa authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/172-abstract-flink-microservice' into 'master'

Feature/172 Abstract Flink Service

Closes #172

See merge request !272
parents d96b5c77 aa4492c9
Branches
Tags
1 merge request!272Introduce Abstract Flink Service Class
Pipeline #8636 passed
package rocks.theodolite.benchmarks.commons.flink;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* A general Apache Flink-based microservice. It is configured by {@link #configureEnv()}, and
* extended by implementing business logic in {@link #buildPipeline()}. The configuration of the
* serializer needs to be implemented in {@link #configureSerializers()}.
*/
public abstract class AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class);
protected final StreamExecutionEnvironment env;
protected Configuration config = ServiceConfigurations.createWithDefaults();
protected final String applicationId;
/**
* Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
*/
public AbstractFlinkService() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
}
/**
* Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
*
* @param config the configuration for the service.
*/
public AbstractFlinkService(final Configuration config) {
this.config = config;
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
}
/**
* Configures the service using environment variables.
*/
protected void configureEnv() {
this.configureCheckpointing();
this.configureParallelism();
this.configureStateBackend();
this.configureSerializers();
}
protected void configureCheckpointing() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
LOGGER.info("Set parallelism to: {}.", checkpointing);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
}
/**
* Configures the parallelism according to the configuration.
*/
protected void configureParallelism() {
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
}
/**
* Configures the state backend according to the configuration.
*/
public void configureStateBackend() {
LOGGER.info("Enable state backend.");
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
}
protected abstract void configureSerializers();
/**
* Empty placeholder. Implement this method to implement the custom logic of your microservice.
*/
protected abstract void buildPipeline();
/**
* Starts the service.
*/
public void run() {
this.configureEnv();
this.buildPipeline();
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Exception thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
}
...@@ -5,6 +5,26 @@ package rocks.theodolite.benchmarks.commons.flink; ...@@ -5,6 +5,26 @@ package rocks.theodolite.benchmarks.commons.flink;
*/ */
public final class ConfigurationKeys { public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String FLINK_STATE_BACKEND = "flink.state.backend"; public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
...@@ -12,7 +32,11 @@ public final class ConfigurationKeys { ...@@ -12,7 +32,11 @@ public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size"; "flink.state.backend.memory.size";
public static final String FLINK_CHECKPOINTING = "checkpointing"; public static final String DEBUG = "debug";
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
......
package rocks.theodolite.benchmarks.uc1.flink; package rocks.theodolite.benchmarks.uc1.flink;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
...@@ -16,48 +12,24 @@ import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; ...@@ -16,48 +12,24 @@ import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
/** /**
* The History microservice implemented as a Flink job. * The History microservice implemented as a Flink job.
*/ */
public final class HistoryServiceFlinkJob { public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/** @Override
* Create a new instance of the {@link HistoryServiceFlinkJob}. public void configureEnv() {
*/ super.configureCheckpointing();
public HistoryServiceFlinkJob() { super.configureParallelism();
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism @Override
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); protected void configureSerializers() {
if (parallelism != null) { // No serializers needed here
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
} }
}
private void buildPipeline() { @Override
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
...@@ -79,17 +51,6 @@ public final class HistoryServiceFlinkJob { ...@@ -79,17 +51,6 @@ public final class HistoryServiceFlinkJob {
.returns(Types.VOID); // Will never be used .returns(Types.VOID); // Will never be used
} }
/**
* Start running this microservice.
*/
public void run() {
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) { public static void main(final String[] args) {
new HistoryServiceFlinkJob().run(); new HistoryServiceFlinkJob().run();
} }
......
package rocks.theodolite.benchmarks.uc2.flink; package rocks.theodolite.benchmarks.uc2.flink;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
...@@ -13,9 +10,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -13,9 +10,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
...@@ -23,51 +19,13 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; ...@@ -23,51 +19,13 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
/** /**
* The History microservice implemented as a Flink job. * The History microservice implemented as a Flink job.
*/ */
public final class HistoryServiceFlinkJob { public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/** @Override
* Create a new instance of the {@link HistoryServiceFlinkJob}. protected void configureSerializers() {
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers() this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
...@@ -75,7 +33,8 @@ public final class HistoryServiceFlinkJob { ...@@ -75,7 +33,8 @@ public final class HistoryServiceFlinkJob {
} }
private void buildPipeline() { @Override
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
...@@ -112,20 +71,6 @@ public final class HistoryServiceFlinkJob { ...@@ -112,20 +71,6 @@ public final class HistoryServiceFlinkJob {
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
} }
/**
* Start running this microservice.
*/
public void run() {
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) { public static void main(final String[] args) {
new HistoryServiceFlinkJob().run(); new HistoryServiceFlinkJob().run();
} }
......
...@@ -4,12 +4,9 @@ import com.google.common.math.Stats; ...@@ -4,12 +4,9 @@ import com.google.common.math.Stats;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
...@@ -17,9 +14,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -17,9 +14,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKey; import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKey;
...@@ -30,51 +26,12 @@ import rocks.theodolite.benchmarks.uc3.flink.util.StatsKeyFactory; ...@@ -30,51 +26,12 @@ import rocks.theodolite.benchmarks.uc3.flink.util.StatsKeyFactory;
/** /**
* The History microservice implemented as a Flink job. * The History microservice implemented as a Flink job.
*/ */
public final class HistoryServiceFlinkJob { public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults(); @Override
private final StreamExecutionEnvironment env; protected void configureSerializers() {
private final String applicationId;
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
new HourOfDayKeySerde()); new HourOfDayKeySerde());
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
...@@ -86,7 +43,8 @@ public final class HistoryServiceFlinkJob { ...@@ -86,7 +43,8 @@ public final class HistoryServiceFlinkJob {
} }
} }
private void buildPipeline() { @Override
protected void buildPipeline() {
// Configurations // Configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
...@@ -135,21 +93,6 @@ public final class HistoryServiceFlinkJob { ...@@ -135,21 +93,6 @@ public final class HistoryServiceFlinkJob {
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
} }
/**
* Start running this microservice.
*/
public void run() {
// Execution plan
LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
// Execute Job
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) { public static void main(final String[] args) {
new HistoryServiceFlinkJob().run(); new HistoryServiceFlinkJob().run();
} }
......
...@@ -2,15 +2,12 @@ package rocks.theodolite.benchmarks.uc4.flink; // NOPMD Imports required ...@@ -2,15 +2,12 @@ package rocks.theodolite.benchmarks.uc4.flink; // NOPMD Imports required
import java.time.Duration; import java.time.Duration;
import java.util.Set; import java.util.Set;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
...@@ -19,11 +16,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -19,11 +16,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations;
import rocks.theodolite.benchmarks.commons.configuration.events.Event; import rocks.theodolite.benchmarks.commons.configuration.events.Event;
import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde; import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.TupleType; import rocks.theodolite.benchmarks.commons.flink.TupleType;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
...@@ -38,57 +34,12 @@ import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKeySerializer; ...@@ -38,57 +34,12 @@ import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKeySerializer;
/** /**
* The Aggregation microservice implemented as a Flink job. * The Aggregation microservice implemented as a Flink job.
*/ */
public final class AggregationServiceFlinkJob { public final class AggregationServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults(); @Override
private final StreamExecutionEnvironment env; protected void configureSerializers() {
private final String applicationId;
/**
* Create a new {@link AggregationServiceFlinkJob}.
*/
public AggregationServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
// Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration();
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer()); new ImmutableSensorRegistrySerializer());
this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
...@@ -108,7 +59,8 @@ public final class AggregationServiceFlinkJob { ...@@ -108,7 +59,8 @@ public final class AggregationServiceFlinkJob {
s.getSerializer().getClass().getName())); s.getSerializer().getClass().getName()));
} }
private void buildPipeline() { @Override
protected void buildPipeline() {
// Get configurations // Get configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
...@@ -204,21 +156,6 @@ public final class AggregationServiceFlinkJob { ...@@ -204,21 +156,6 @@ public final class AggregationServiceFlinkJob {
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
} }
/**
* Start running this microservice.
*/
public void run() {
// Execution plan
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
// Execute Job
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) { public static void main(final String[] args) {
new AggregationServiceFlinkJob().run(); new AggregationServiceFlinkJob().run();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment