diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 24a60f535c5a2bbbdb974f223706c65963778bef..4778acde357653d07a33f43b4ff249b0d20233ad 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -1,22 +1,25 @@ package theodolite.uc1.application; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import titan.ccp.common.configuration.Configurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; -import java.util.Properties; - /** - * The History Microservice Flink Job. + * The History microservice implemented as a Flink job. */ public class HistoryServiceFlinkJob { - private final Configuration config = Configurations.create(); + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); + + private final Configuration config = ServiceConfigurations.createWithDefaults(); private void run() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); @@ -32,12 +35,6 @@ public class HistoryServiceFlinkJob { kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - /* - * final DeserializationSchema<ActivePowerRecord> serde = new - * FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class, - * ActivePowerRecordFactory.class); - */ - final DeserializationSchema<ActivePowerRecord> serde = ConfluentRegistryAvroDeserializationSchema.forSpecific( ActivePowerRecord.class, @@ -46,12 +43,14 @@ public class HistoryServiceFlinkJob { final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); kafkaConsumer.setStartFromGroupOffsets(); - if (checkpointing) + if (checkpointing) { kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (checkpointing) + if (checkpointing) { env.enableCheckpointing(commitIntervalMs); + } final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer); @@ -65,8 +64,8 @@ public class HistoryServiceFlinkJob { try { env.execute(applicationId); - } catch (Exception e) { - e.printStackTrace(); + } catch (final Exception e) { // NOPMD Execution thrown by Flink + LOGGER.error("An error occured while running this job.", e); } }