diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java new file mode 100644 index 0000000000000000000000000000000000000000..f348543cd9897bc3abf1871ce828c22ea531dd4c --- /dev/null +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java @@ -0,0 +1,114 @@ +package rocks.theodolite.benchmarks.commons.flink; + +import org.apache.commons.configuration2.Configuration; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.configuration.ServiceConfigurations; + +/** + * A general Apache Flink-based microservice. It is configured by {@link #configureEnv()}, and + * extended by implementing business logic in {@link #buildPipeline()}. The configuration of the + * serializer needs to be implemented in {@link #configureSerializers()}. + */ +public abstract class AbstractFlinkService { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class); + protected final StreamExecutionEnvironment env; + + protected Configuration config = ServiceConfigurations.createWithDefaults(); + + protected final String applicationId; + + /** + * Abstract Service constructing the name and {@link StreamExecutionEnvironment}. + */ + public AbstractFlinkService() { + final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); + final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); + this.applicationId = applicationName + "-" + applicationVersion; + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + } + + /** + * Abstract Service constructing the name and {@link StreamExecutionEnvironment}. + * + * @param config the configuration for the service. + */ + public AbstractFlinkService(final Configuration config) { + this.config = config; + final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); + final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); + this.applicationId = applicationName + "-" + applicationVersion; + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + + + /** + * Configures the service using environment variables. + */ + protected void configureEnv() { + this.configureCheckpointing(); + this.configureParallelism(); + this.configureStateBackend(); + this.configureSerializers(); + } + + protected void configureCheckpointing() { + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); + final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); + LOGGER.info("Set parallelism to: {}.", checkpointing); + if (checkpointing) { + this.env.enableCheckpointing(commitIntervalMs); + } + } + + /** + * Configures the parallelism according to the configuration. + */ + protected void configureParallelism() { + final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); + if (parallelism != null) { + LOGGER.info("Set parallelism: {}.", parallelism); + this.env.setParallelism(parallelism); + } + } + + /** + * Configures the state backend according to the configuration. + */ + public void configureStateBackend() { + LOGGER.info("Enable state backend."); + final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); + this.env.setStateBackend(stateBackend); + } + + + protected abstract void configureSerializers(); + + /** + * Empty placeholder. Implement this method to implement the custom logic of your microservice. + */ + protected abstract void buildPipeline(); + + /** + * Starts the service. + */ + public void run() { + this.configureEnv(); + this.buildPipeline(); + LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); + + try { + this.env.execute(this.applicationId); + } catch (final Exception e) { // NOPMD Exception thrown by Flink + LOGGER.error("An error occured while running this job.", e); + } + } + + +} diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java index 8fd8fbde288d1750fb1bab2147885d7be6245316..9eb143c3c07f879de37eafa2fbe6729bf182d45e 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java @@ -5,6 +5,26 @@ package rocks.theodolite.benchmarks.commons.flink; */ public final class ConfigurationKeys { + public static final String APPLICATION_NAME = "application.name"; + + public static final String APPLICATION_VERSION = "application.version"; + + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + + public static final String WINDOW_SIZE_MS = "window.size.ms"; + + public static final String WINDOW_GRACE_MS = "window.grace.ms"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + public static final String FLINK_STATE_BACKEND = "flink.state.backend"; public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; @@ -12,7 +32,11 @@ public final class ConfigurationKeys { public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD "flink.state.backend.memory.size"; - public static final String FLINK_CHECKPOINTING = "checkpointing"; + public static final String DEBUG = "debug"; + + public static final String CHECKPOINTING = "checkpointing"; + + public static final String PARALLELISM = "parallelism"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java index 83936d7482058e7db2ae6ebf2292ff3c22dbda09..9d3412c7f7a318b471902f9f2f38e714bf1034ec 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java @@ -1,13 +1,9 @@ package rocks.theodolite.benchmarks.uc1.flink; -import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; +import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; @@ -16,48 +12,24 @@ import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; /** * The History microservice implemented as a Flink job. */ -public final class HistoryServiceFlinkJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final StreamExecutionEnvironment env; - private final String applicationId; +public final class HistoryServiceFlinkJob extends AbstractFlinkService { private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - this.applicationId = applicationName + "-" + applicationVersion; - - this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - - this.configureEnv(); - - this.buildPipeline(); + @Override + public void configureEnv() { + super.configureCheckpointing(); + super.configureParallelism(); } - private void configureEnv() { - final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); - if (checkpointing) { - this.env.enableCheckpointing(commitIntervalMs); - } - - // Parallelism - final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); - if (parallelism != null) { - LOGGER.info("Set parallelism: {}.", parallelism); - this.env.setParallelism(parallelism); - } - + @Override + protected void configureSerializers() { + // No serializers needed here } - private void buildPipeline() { + + @Override + protected void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); @@ -79,17 +51,6 @@ public final class HistoryServiceFlinkJob { .returns(Types.VOID); // Will never be used } - /** - * Start running this microservice. - */ - public void run() { - try { - this.env.execute(this.applicationId); - } catch (final Exception e) { // NOPMD Execution thrown by Flink - LOGGER.error("An error occured while running this job.", e); - } - } - public static void main(final String[] args) { new HistoryServiceFlinkJob().run(); } diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index 30d6081534b04be6e33af96d42f5c783111e1e3e..5a34d17a89186630afb0917e16940210b84fd5e8 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -1,11 +1,8 @@ package rocks.theodolite.benchmarks.uc2.flink; import com.google.common.math.Stats; -import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -13,9 +10,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; +import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; -import rocks.theodolite.benchmarks.commons.flink.StateBackends; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -23,51 +19,13 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; /** * The History microservice implemented as a Flink job. */ -public final class HistoryServiceFlinkJob { +public final class HistoryServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final StreamExecutionEnvironment env; - private final String applicationId; - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - this.applicationId = applicationName + "-" + applicationVersion; - - this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - - this.configureEnv(); - - this.buildPipeline(); - } - - private void configureEnv() { - final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); - if (checkpointing) { - this.env.enableCheckpointing(commitIntervalMs); - } - - // Parallelism - final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); - if (parallelism != null) { - LOGGER.info("Set parallelism: {}.", parallelism); - this.env.setParallelism(parallelism); - } - - // State Backend - final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); - this.env.setStateBackend(stateBackend); - - this.configureSerializers(); - } - - private void configureSerializers() { + @Override + protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); this.env.getConfig().getRegisteredTypesWithKryoSerializers() .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " @@ -75,7 +33,8 @@ public final class HistoryServiceFlinkJob { } - private void buildPipeline() { + @Override + protected void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); @@ -112,20 +71,6 @@ public final class HistoryServiceFlinkJob { .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); } - - /** - * Start running this microservice. - */ - public void run() { - LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); - - try { - this.env.execute(this.applicationId); - } catch (final Exception e) { // NOPMD Execution thrown by Flink - LOGGER.error("An error occured while running this job.", e); - } - } - public static void main(final String[] args) { new HistoryServiceFlinkJob().run(); } diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index 4554d505c7a8c90dbf7fbc075d5877b1fa49412b..d80f64fafb69d3e0287347a8f90080584d4fcd82 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -4,12 +4,9 @@ import com.google.common.math.Stats; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -17,9 +14,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; +import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; -import rocks.theodolite.benchmarks.commons.flink.StateBackends; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.uc3.flink.util.HourOfDayKey; @@ -30,51 +26,12 @@ import rocks.theodolite.benchmarks.uc3.flink.util.StatsKeyFactory; /** * The History microservice implemented as a Flink job. */ -public final class HistoryServiceFlinkJob { +public final class HistoryServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final StreamExecutionEnvironment env; - private final String applicationId; - - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - this.applicationId = applicationName + "-" + applicationVersion; - - this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - - this.configureEnv(); - - this.buildPipeline(); - } - - private void configureEnv() { - final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); - if (checkpointing) { - this.env.enableCheckpointing(commitIntervalMs); - } - - // Parallelism - final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); - if (parallelism != null) { - LOGGER.error("Set parallelism: {}.", parallelism); - this.env.setParallelism(parallelism); - } - - // State Backend - final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); - this.env.setStateBackend(stateBackend); - - this.configureSerializers(); - } - - private void configureSerializers() { + @Override + protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); @@ -86,7 +43,8 @@ public final class HistoryServiceFlinkJob { } } - private void buildPipeline() { + @Override + protected void buildPipeline() { // Configurations final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); @@ -135,21 +93,6 @@ public final class HistoryServiceFlinkJob { .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); } - /** - * Start running this microservice. - */ - public void run() { - // Execution plan - LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan()); - - // Execute Job - try { - this.env.execute(this.applicationId); - } catch (final Exception e) { // NOPMD Execution thrown by Flink - LOGGER.error("An error occured while running this job.", e); - } - } - public static void main(final String[] args) { new HistoryServiceFlinkJob().run(); } diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index fd6d073bb5a4f7948e06e081a94eaa027be036b5..5f4515cb851439841d1de3193f21275545033481 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -2,15 +2,12 @@ package rocks.theodolite.benchmarks.uc4.flink; // NOPMD Imports required import java.time.Duration; import java.util.Set; -import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -19,11 +16,10 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; import rocks.theodolite.benchmarks.commons.configuration.events.Event; import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde; +import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; -import rocks.theodolite.benchmarks.commons.flink.StateBackends; import rocks.theodolite.benchmarks.commons.flink.TupleType; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -38,57 +34,12 @@ import rocks.theodolite.benchmarks.uc4.flink.util.SensorParentKeySerializer; /** * The Aggregation microservice implemented as a Flink job. */ -public final class AggregationServiceFlinkJob { +public final class AggregationServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final StreamExecutionEnvironment env; - private final String applicationId; - - /** - * Create a new {@link AggregationServiceFlinkJob}. - */ - public AggregationServiceFlinkJob() { - final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - this.applicationId = applicationName + "-" + applicationVersion; - - // Execution environment configuration - // org.apache.flink.configuration.Configuration conf = new - // org.apache.flink.configuration.Configuration(); - // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - // final StreamExecutionEnvironment env = - // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); - this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - - this.configureEnv(); - - this.buildPipeline(); - } - - private void configureEnv() { - final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); - if (checkpointing) { - this.env.enableCheckpointing(commitIntervalMs); - } - - // Parallelism - final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); - if (parallelism != null) { - LOGGER.info("Set parallelism: {}.", parallelism); - this.env.setParallelism(parallelism); - } - - // State Backend - final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); - this.env.setStateBackend(stateBackend); - - this.configureSerializers(); - } - - private void configureSerializers() { + @Override + protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer()); this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, @@ -108,7 +59,8 @@ public final class AggregationServiceFlinkJob { s.getSerializer().getClass().getName())); } - private void buildPipeline() { + @Override + protected void buildPipeline() { // Get configurations final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); @@ -204,21 +156,6 @@ public final class AggregationServiceFlinkJob { .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); } - /** - * Start running this microservice. - */ - public void run() { - // Execution plan - LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); - - // Execute Job - try { - this.env.execute(this.applicationId); - } catch (final Exception e) { // NOPMD Execution thrown by Flink - LOGGER.error("An error occured while running this job.", e); - } - } - public static void main(final String[] args) { new AggregationServiceFlinkJob().run(); }