Skip to content
Snippets Groups Projects
Commit 73536b95 authored by Sören Henning's avatar Sören Henning
Browse files

Fix some code quality issues in UC3

parent bd851a5a
Branches
Tags
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2215 failed
...@@ -29,7 +29,8 @@ public final class ConfigurationKeys { ...@@ -29,7 +29,8 @@ public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size"; public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
......
package theodolite.uc3.application; package theodolite.uc3.application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
...@@ -25,29 +30,22 @@ import org.slf4j.Logger; ...@@ -25,29 +30,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.commons.flink.serialization.StatsSerializer;
import theodolite.uc3.application.ConfigurationKeys;
import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKey;
import theodolite.uc3.application.util.HourOfDayKeyFactory; import theodolite.uc3.application.util.HourOfDayKeyFactory;
import theodolite.uc3.application.util.HourOfDayKeySerde; import theodolite.uc3.application.util.HourOfDayKeySerde;
import theodolite.uc3.application.util.StatsKeyFactory; import theodolite.uc3.application.util.StatsKeyFactory;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Properties;
/** /**
* The History Microservice Flink Job. * The History microservice implemented as a Flink job.
*/ */
public class HistoryServiceFlinkJob { public class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private void run() { private void run() {
// Configurations // Configurations
...@@ -61,11 +59,17 @@ public class HistoryServiceFlinkJob { ...@@ -61,11 +59,17 @@ public class HistoryServiceFlinkJob {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE);
final ZoneId timeZone = ZoneId.of(timeZoneString); final ZoneId timeZone = ZoneId.of(timeZoneString);
final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Time aggregationDuration =
final Time aggregationAdvance = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); final Time aggregationAdvance =
final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); final String stateBackend =
this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config
.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize =
this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties(); final Properties kafkaProps = new Properties();
...@@ -83,16 +87,16 @@ public class HistoryServiceFlinkJob { ...@@ -83,16 +87,16 @@ public class HistoryServiceFlinkJob {
inputTopic, sourceSerde, kafkaProps); inputTopic, sourceSerde, kafkaProps);
kafkaSource.setStartFromGroupOffsets(); kafkaSource.setStartFromGroupOffsets();
if (checkpointing) if (checkpointing) {
kafkaSource.setCommitOffsetsOnCheckpoints(true); kafkaSource.setCommitOffsetsOnCheckpoints(true);
}
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final FlinkKafkaKeyValueSerde<String, String> sinkSerde = final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
new FlinkKafkaKeyValueSerde<>(outputTopic, new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String, Serdes::String,
Serdes::String, Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}) TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
);
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
...@@ -103,8 +107,9 @@ public class HistoryServiceFlinkJob { ...@@ -103,8 +107,9 @@ public class HistoryServiceFlinkJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing) if (checkpointing) {
env.enableCheckpointing(commitIntervalMs); env.enableCheckpointing(commitIntervalMs);
}
// State Backend // State Backend
if (stateBackend.equals("filesystem")) { if (stateBackend.equals("filesystem")) {
...@@ -112,8 +117,8 @@ public class HistoryServiceFlinkJob { ...@@ -112,8 +117,8 @@ public class HistoryServiceFlinkJob {
} else if (stateBackend.equals("rocksdb")) { } else if (stateBackend.equals("rocksdb")) {
try { try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (IOException e) { } catch (final IOException e) {
e.printStackTrace(); LOGGER.error("Cannot create RocksDB state backend.", e);
} }
} else { } else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
...@@ -122,10 +127,11 @@ public class HistoryServiceFlinkJob { ...@@ -122,10 +127,11 @@ public class HistoryServiceFlinkJob {
// Kryo serializer registration // Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
for (final var entry : env.getConfig().getRegisteredTypesWithKryoSerializers().entrySet()) {
env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> LOGGER.info("Class {} registered with serializer {}.",
LOGGER.info("Class " + c.getName() + " registered with serializer " entry.getKey().getName(),
+ s.getSerializer().getClass().getName())); entry.getValue().getSerializer().getClass().getName());
}
// Streaming topology // Streaming topology
...@@ -145,7 +151,7 @@ public class HistoryServiceFlinkJob { ...@@ -145,7 +151,7 @@ public class HistoryServiceFlinkJob {
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() { .map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() {
@Override @Override
public Tuple2<String, String> map(Tuple2<HourOfDayKey, Stats> tuple) { public Tuple2<String, String> map(final Tuple2<HourOfDayKey, Stats> tuple) {
final String newKey = keyFactory.getSensorId(tuple.f0); final String newKey = keyFactory.getSensorId(tuple.f0);
final String newValue = tuple.f1.toString(); final String newValue = tuple.f1.toString();
final int hourOfDay = tuple.f0.getHourOfDay(); final int hourOfDay = tuple.f0.getHourOfDay();
...@@ -156,17 +162,14 @@ public class HistoryServiceFlinkJob { ...@@ -156,17 +162,14 @@ public class HistoryServiceFlinkJob {
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
// Execution plan // Execution plan
LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Execution Plan: " + env.getExecutionPlan());
}
// Execute Job // Execute Job
try { try {
env.execute(applicationId); env.execute(applicationId);
} catch (Exception e) { } catch (final Exception e) { // NOPMD Execution thrown by Flink
e.printStackTrace(); LOGGER.error("An error occured while running this job.", e);
} }
} }
......
...@@ -7,7 +7,10 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; ...@@ -7,7 +7,10 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKey;
public class HourOfDayProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> { public class HourOfDayProcessWindowFunction
extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> {
private static final long serialVersionUID = 7702216563302727315L; // NOPMD
@Override @Override
public void process(final HourOfDayKey hourOfDayKey, public void process(final HourOfDayKey hourOfDayKey,
......
...@@ -9,10 +9,9 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -9,10 +9,9 @@ import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
*/ */
@SuppressWarnings("UnstableApiUsage")
public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> { public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
private static final long serialVersionUID = -8873572990921515499L; private static final long serialVersionUID = -8873572990921515499L; // NOPMD
@Override @Override
public Stats createAccumulator() { public Stats createAccumulator() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment