From bd851a5acb3eabd96b27c9f00ecee509c76883f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 9 Mar 2021 17:42:23 +0100 Subject: [PATCH] Fix some code styles issues in UC2 --- .../uc2/application/ConfigurationKeys.java | 8 +-- .../application/HistoryServiceFlinkJob.java | 62 +++++++++---------- .../application/StatsAggregateFunction.java | 3 +- .../StatsProcessWindowFunction.java | 8 ++- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java index 72e07ffee..9ba56c828 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java @@ -14,7 +14,7 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; @@ -25,11 +25,11 @@ public final class ConfigurationKeys { public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size"; + public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD + "flink.state.backend.memory.size"; public static final String CHECKPOINTING = "checkpointing"; - private ConfigurationKeys() { - } + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index 4ae68be71..9ee64ad2f 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -1,19 +1,18 @@ package theodolite.uc2.application; import com.google.common.math.Stats; +import java.io.IOException; +import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; -import org.apache.flink.formats.avro.typeutils.AvroSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -29,22 +28,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.StatsSerializer; -import theodolite.uc2.application.ConfigurationKeys; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; -import java.io.IOException; -import java.util.Properties; - /** - * The History Microservice Flink Job. + * The History microservice implemented as a Flink job. */ public class HistoryServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private void run() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); @@ -56,35 +51,39 @@ public class HistoryServiceFlinkJob { final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); - final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); - final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); - final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); - final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); + final String stateBackend = + this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); + final String stateBackendPath = this.config + .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); + final int memoryStateBackendSize = + this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, + MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - + final DeserializationSchema<ActivePowerRecord> sourceSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); kafkaSource.setStartFromGroupOffsets(); - if (checkpointing) + if (checkpointing) { kafkaSource.setCommitOffsetsOnCheckpoints(true); + } kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde = new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}) - ); - kafkaProps.setProperty("transaction.timeout.ms", ""+5*60*1000); + TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); + kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); kafkaSink.setWriteTimestampToKafka(true); @@ -92,8 +91,9 @@ public class HistoryServiceFlinkJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - if (checkpointing) + if (checkpointing) { env.enableCheckpointing(commitIntervalMs); + } // State Backend if (stateBackend.equals("filesystem")) { @@ -101,8 +101,8 @@ public class HistoryServiceFlinkJob { } else if (stateBackend.equals("rocksdb")) { try { env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); - } catch (IOException e) { - e.printStackTrace(); + } catch (final IOException e) { + LOGGER.error("Cannot create RocksDB state backend.", e); } } else { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); @@ -110,8 +110,8 @@ public class HistoryServiceFlinkJob { env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); - env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> - LOGGER.info("Class " + c.getName() + " registered with serializer " + env.getConfig().getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " + s.getSerializer().getClass().getName())); final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource) @@ -124,10 +124,10 @@ public class HistoryServiceFlinkJob { .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) .map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() { @Override - public Tuple2<String, String> map(Tuple2<String, Stats> t) { + public Tuple2<String, String> map(final Tuple2<String, Stats> t) { final String key = t.f0; final String value = t.f1.toString(); - LOGGER.info(key + ": " + value); + LOGGER.info("{}: {}", key, value); return new Tuple2<>(key, value); } }).name("map") @@ -139,8 +139,8 @@ public class HistoryServiceFlinkJob { try { env.execute(applicationId); - } catch (Exception e) { - e.printStackTrace(); + } catch (final Exception e) { // NOPMD Execution thrown by Flink + LOGGER.error("An error occured while running this job.", e); } } diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java index 5c90d9bb7..7bd090de8 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java @@ -9,10 +9,9 @@ import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. */ -@SuppressWarnings("UnstableApiUsage") public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> { - private static final long serialVersionUID = -8873572990921515499L; + private static final long serialVersionUID = -8873572990921515499L; // NOPMD @Override public Stats createAccumulator() { diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java index fb49f46c4..a5c370eed 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java @@ -6,12 +6,14 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -public class StatsProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> { +public class StatsProcessWindowFunction + extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> { - private static final long serialVersionUID = 4363099880614593379L; + private static final long serialVersionUID = 4363099880614593379L; // NOPMD @Override - public void process(String key, Context context, Iterable<Stats> elements, Collector<Tuple2<String, Stats>> out) { + public void process(final String key, final Context context, final Iterable<Stats> elements, + final Collector<Tuple2<String, Stats>> out) { final Stats stats = elements.iterator().next(); out.collect(new Tuple2<>(key, stats)); } -- GitLab