Skip to content
Snippets Groups Projects
Commit fb70ae26 authored by Sören Henning's avatar Sören Henning
Browse files

Resolve code quality issues in UC1

parent da5ece7a
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2213 failed
package theodolite.uc1.application;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import titan.ccp.common.configuration.Configurations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
import java.util.Properties;
/**
* The History Microservice Flink Job.
* The History microservice implemented as a Flink job.
*/
public class HistoryServiceFlinkJob {
private final Configuration config = Configurations.create();
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private void run() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
......@@ -32,12 +35,6 @@ public class HistoryServiceFlinkJob {
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("group.id", applicationId);
/*
* final DeserializationSchema<ActivePowerRecord> serde = new
* FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class,
* ActivePowerRecordFactory.class);
*/
final DeserializationSchema<ActivePowerRecord> serde =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
ActivePowerRecord.class,
......@@ -46,12 +43,14 @@ public class HistoryServiceFlinkJob {
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
kafkaConsumer.setStartFromGroupOffsets();
if (checkpointing)
if (checkpointing) {
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (checkpointing)
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
......@@ -65,8 +64,8 @@ public class HistoryServiceFlinkJob {
try {
env.execute(applicationId);
} catch (Exception e) {
e.printStackTrace();
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment