diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 10546b515a645c6f0c7033b4166012b3bc587fca..5118aefc0bfc53ba5645ef1a013db4afab9cb44f 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -18,29 +18,45 @@ public class HistoryServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); private final Configuration config = ServiceConfigurations.createWithDefaults(); + private final StreamExecutionEnvironment env; + private final String applicationId; - private void run() { + /** + * Create a new instance of the {@link HistoryServiceFlinkJob}. + */ + public final HistoryServiceFlinkJob() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - final String applicationId = applicationName + "-" + applicationVersion; + this.applicationId = applicationName + "-" + applicationVersion; + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + this.configureEnv(); + + this.buildPipeline(); + } + + private void configureEnv() { + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); + if (checkpointing) { + this.env.enableCheckpointing(commitIntervalMs); + } + } + + private void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); + final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( - applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); + this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (checkpointing) { - env.enableCheckpointing(commitIntervalMs); - } - - final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer); + final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer); stream .rebalance() @@ -49,9 +65,14 @@ public class HistoryServiceFlinkJob { + "timestamp: " + v.getTimestamp() + ", " + "valueInW: " + v.getValueInW() + " }") .print(); + } + /** + * Start running this microservice. + */ + public void run() { try { - env.execute(applicationId); + this.env.execute(this.applicationId); } catch (final Exception e) { // NOPMD Execution thrown by Flink LOGGER.error("An error occured while running this job.", e); } diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index 1a7e6a2dd20b4dfbbde841a84829a144d514cc55..b8452847df800226ad481f9309323a2a9a532939 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -24,27 +24,63 @@ import titan.ccp.model.records.ActivePowerRecord; /** * The History microservice implemented as a Flink job. */ -public class HistoryServiceFlinkJob { +public final class HistoryServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); private final Configuration config = ServiceConfigurations.createWithDefaults(); + private final StreamExecutionEnvironment env; + private final String applicationId; - private void run() { + /** + * Create a new instance of the {@link HistoryServiceFlinkJob}. + */ + public HistoryServiceFlinkJob() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - final String applicationId = applicationName + "-" + applicationVersion; + this.applicationId = applicationName + "-" + applicationVersion; + + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + this.configureEnv(); + + this.buildPipeline(); + } + + private void configureEnv() { + this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); + if (checkpointing) { + this.env.enableCheckpointing(commitIntervalMs); + } + + // State Backend + final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); + this.env.setStateBackend(stateBackend); + + this.configureSerializers(); + } + + private void configureSerializers() { + this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); + this.env.getConfig().getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " + + s.getSerializer().getClass().getName())); + + } + + private void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( - applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); + this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); @@ -55,22 +91,7 @@ public class HistoryServiceFlinkJob { Serdes::String, Types.TUPLE(Types.STRING, Types.STRING)); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - if (checkpointing) { - env.enableCheckpointing(commitIntervalMs); - } - - // State Backend - env.setStateBackend(stateBackend); - - env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); - env.getConfig().getRegisteredTypesWithKryoSerializers() - .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " - + s.getSerializer().getClass().getName())); - - env + this.env .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) .rebalance() .keyBy(ActivePowerRecord::getIdentifier) @@ -83,11 +104,17 @@ public class HistoryServiceFlinkJob { return new Tuple2<>(key, value); }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); + } + - LOGGER.info("Execution plan: {}", env.getExecutionPlan()); + /** + * Start running this microservice. + */ + public void run() { + LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); try { - env.execute(applicationId); + this.env.execute(this.applicationId); } catch (final Exception e) { // NOPMD Execution thrown by Flink LOGGER.error("An error occured while running this job.", e); } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 25e197c7c8ec8858f2dd8d0f96dfb30191cfbbe2..0db5a3d524f74fbf22304e8f9b44fa55eead321a 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -38,7 +38,7 @@ import titan.ccp.model.sensorregistry.SensorRegistry; /** * The Aggregation microservice implemented as a Flink job. */ -public class AggregationServiceFlinkJob { +public final class AggregationServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);