From fb70ae26abe4b8ef76affb5f0265e361216c7ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 9 Mar 2021 17:33:21 +0100 Subject: [PATCH] Resolve code quality issues in UC1 --- .../application/HistoryServiceFlinkJob.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 24a60f535..4778acde3 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -1,22 +1,25 @@ package theodolite.uc1.application; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import titan.ccp.common.configuration.Configurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; -import java.util.Properties; - /** - * The History Microservice Flink Job. + * The History microservice implemented as a Flink job. */ public class HistoryServiceFlinkJob { - private final Configuration config = Configurations.create(); + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); + + private final Configuration config = ServiceConfigurations.createWithDefaults(); private void run() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); @@ -32,12 +35,6 @@ public class HistoryServiceFlinkJob { kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - /* - * final DeserializationSchema<ActivePowerRecord> serde = new - * FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class, - * ActivePowerRecordFactory.class); - */ - final DeserializationSchema<ActivePowerRecord> serde = ConfluentRegistryAvroDeserializationSchema.forSpecific( ActivePowerRecord.class, @@ -46,12 +43,14 @@ public class HistoryServiceFlinkJob { final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); kafkaConsumer.setStartFromGroupOffsets(); - if (checkpointing) + if (checkpointing) { kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (checkpointing) + if (checkpointing) { env.enableCheckpointing(commitIntervalMs); + } final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer); @@ -65,8 +64,8 @@ public class HistoryServiceFlinkJob { try { env.execute(applicationId); - } catch (Exception e) { - e.printStackTrace(); + } catch (final Exception e) { // NOPMD Execution thrown by Flink + LOGGER.error("An error occured while running this job.", e); } } -- GitLab