diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java index 00e9202899fdabd4009ad10c0e357442a013e658..a895c74d89c5d788c47b3b78dc70500b4b5a6f5b 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java @@ -14,7 +14,7 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; @@ -29,7 +29,8 @@ public final class ConfigurationKeys { public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size"; + public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD + "flink.state.backend.memory.size"; public static final String CHECKPOINTING = "checkpointing"; diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index 900c375148d2dc2e69285d72a5e0150e30346d4e..31b840924afa34243fe0e8afafae5ff862e9a61b 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -1,6 +1,11 @@ package theodolite.uc3.application; import com.google.common.math.Stats; +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -25,29 +30,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.StatsSerializer; -import theodolite.uc3.application.ConfigurationKeys; import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKeyFactory; import theodolite.uc3.application.util.HourOfDayKeySerde; import theodolite.uc3.application.util.StatsKeyFactory; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; -import java.io.IOException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.Properties; - /** - * The History Microservice Flink Job. + * The History microservice implemented as a Flink job. */ public class HistoryServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private void run() { // Configurations @@ -61,38 +59,44 @@ public class HistoryServiceFlinkJob { final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); final ZoneId timeZone = ZoneId.of(timeZoneString); - final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); - final Time aggregationAdvance = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); - final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); - final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); - final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); - final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); + final Time aggregationDuration = + Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + final Time aggregationAdvance = + Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + final String stateBackend = + this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); + final String stateBackendPath = this.config + .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); + final int memoryStateBackendSize = + this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, + MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); // Sources and Sinks with Serializer and Deserializer - + final DeserializationSchema<ActivePowerRecord> sourceSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); kafkaSource.setStartFromGroupOffsets(); - if (checkpointing) + if (checkpointing) { kafkaSource.setCommitOffsetsOnCheckpoints(true); + } kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); final FlinkKafkaKeyValueSerde<String, String> sinkSerde = new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}) - ); + TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); @@ -103,8 +107,9 @@ public class HistoryServiceFlinkJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - if (checkpointing) + if (checkpointing) { env.enableCheckpointing(commitIntervalMs); + } // State Backend if (stateBackend.equals("filesystem")) { @@ -112,8 +117,8 @@ public class HistoryServiceFlinkJob { } else if (stateBackend.equals("rocksdb")) { try { env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); - } catch (IOException e) { - e.printStackTrace(); + } catch (final IOException e) { + LOGGER.error("Cannot create RocksDB state backend.", e); } } else { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); @@ -122,10 +127,11 @@ public class HistoryServiceFlinkJob { // Kryo serializer registration env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); - - env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> - LOGGER.info("Class " + c.getName() + " registered with serializer " - + s.getSerializer().getClass().getName())); + for (final var entry : env.getConfig().getRegisteredTypesWithKryoSerializers().entrySet()) { + LOGGER.info("Class {} registered with serializer {}.", + entry.getKey().getName(), + entry.getValue().getSerializer().getClass().getName()); + } // Streaming topology @@ -145,7 +151,7 @@ public class HistoryServiceFlinkJob { .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() { @Override - public Tuple2<String, String> map(Tuple2<HourOfDayKey, Stats> tuple) { + public Tuple2<String, String> map(final Tuple2<HourOfDayKey, Stats> tuple) { final String newKey = keyFactory.getSensorId(tuple.f0); final String newValue = tuple.f1.toString(); final int hourOfDay = tuple.f0.getHourOfDay(); @@ -156,17 +162,14 @@ public class HistoryServiceFlinkJob { .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); // Execution plan - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Execution Plan: " + env.getExecutionPlan()); - } + LOGGER.info("Execution Plan: {}", env.getExecutionPlan()); // Execute Job try { env.execute(applicationId); - } catch (Exception e) { - e.printStackTrace(); + } catch (final Exception e) { // NOPMD Execution thrown by Flink + LOGGER.error("An error occured while running this job.", e); } } diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java index d5edf2dce68b787e1d6d7df72251211691c4fabf..389b0e4a22966995731988f5010ed3ef7e8b209d 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java @@ -7,15 +7,18 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import theodolite.uc3.application.util.HourOfDayKey; -public class HourOfDayProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> { +public class HourOfDayProcessWindowFunction + extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> { + + private static final long serialVersionUID = 7702216563302727315L; // NOPMD @Override public void process(final HourOfDayKey hourOfDayKey, - final Context context, - final Iterable<Stats> elements, - final Collector<Tuple2<HourOfDayKey, Stats>> out) { + final Context context, + final Iterable<Stats> elements, + final Collector<Tuple2<HourOfDayKey, Stats>> out) { final Stats stats = elements.iterator().next(); out.collect(new Tuple2<>(hourOfDayKey, stats)); } -} \ No newline at end of file +} diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java index 277f92b5054aa41cae8953d62f0f6975e9ddda87..4706da0a9491e0391f25cd61639c3bb565509cb1 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java @@ -9,10 +9,9 @@ import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. */ -@SuppressWarnings("UnstableApiUsage") public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> { - private static final long serialVersionUID = -8873572990921515499L; + private static final long serialVersionUID = -8873572990921515499L; // NOPMD @Override public Stats createAccumulator() {