Skip to content
Snippets Groups Projects
Commit 4513127c authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Refactor flink to use AbstractFlinkService

parent 0babb6b3
No related branches found
No related tags found
1 merge request!272Introduce Abstract Flink Service Class
package rocks.theodolite.benchmarks.uc1.flink;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public final class HistoryServiceFlinkJob {
public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
private static final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
super();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
public void configureEnv() {
super.configureCheckpointing();
super.configureParallelism();
}
protected void configureSerializers() {
}
private void buildPipeline() {
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
......@@ -79,17 +56,6 @@ public final class HistoryServiceFlinkJob {
.returns(Types.VOID); // Will never be used
}
/**
* Start running this microservice.
*/
public void run() {
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
......
package rocks.theodolite.benchmarks.uc2.flink;
import com.google.common.math.Stats;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
......@@ -13,61 +10,28 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public final class HistoryServiceFlinkJob {
public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
super();
}
private void configureSerializers() {
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
......@@ -75,7 +39,8 @@ public final class HistoryServiceFlinkJob {
}
private void buildPipeline() {
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
......@@ -112,20 +77,6 @@ public final class HistoryServiceFlinkJob {
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
}
/**
* Start running this microservice.
*/
public void run() {
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
......
......@@ -4,12 +4,9 @@ import com.google.common.math.Stats;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
......@@ -17,64 +14,30 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKeyFactory;
import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKeySerde;
import rocks.theodolite.benchmarks.uc3.flink.util.StatsKeyFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public final class HistoryServiceFlinkJob {
public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
super();
}
private void configureSerializers() {
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
new HourOfDayKeySerde());
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
......@@ -86,7 +49,7 @@ public final class HistoryServiceFlinkJob {
}
}
private void buildPipeline() {
protected void buildPipeline() {
// Configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
......@@ -135,21 +98,6 @@ public final class HistoryServiceFlinkJob {
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
}
/**
* Start running this microservice.
*/
public void run() {
// Execution plan
LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
// Execute Job
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
......
......@@ -2,15 +2,12 @@ package rocks.theodolite.benchmarks.uc4.flink; // NOPMD Imports required
import java.time.Duration;
import java.util.Set;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
......@@ -19,14 +16,13 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import rocks.theodolite.benchmarks.commons.flink.TupleType;
import rocks.theodolite.benchmarks.uc4.flink.util.ImmutableSensorRegistrySerializer;
import rocks.theodolite.benchmarks.uc4.flink.util.ImmutableSetSerializer;
import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKey;
import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKeySerializer;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
......@@ -38,57 +34,18 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* The Aggregation microservice implemented as a Flink job.
*/
public final class AggregationServiceFlinkJob {
public final class AggregationServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/**
* Create a new {@link AggregationServiceFlinkJob}.
*/
public AggregationServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
// Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration();
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
super();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer());
this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
......@@ -108,7 +65,7 @@ public final class AggregationServiceFlinkJob {
s.getSerializer().getClass().getName()));
}
private void buildPipeline() {
protected void buildPipeline() {
// Get configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
......@@ -204,21 +161,6 @@ public final class AggregationServiceFlinkJob {
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
}
/**
* Start running this microservice.
*/
public void run() {
// Execution plan
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
// Execute Job
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new AggregationServiceFlinkJob().run();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment