diff --git a/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs index 531483c9519de61915e88f8c9b88c352a8e8b4b0..272e01533f6a345d53d2635c47e38c6d3c33dc8a 100644 --- a/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/uc4-application-flink/.settings/org.eclipse.jdt.ui.prefs @@ -59,5 +59,70 @@ cleanup.use_this_for_non_static_method_access_only_if_necessary=false cleanup_profile=_CAU-SE-Style cleanup_settings_version=2 eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style formatter_settings_version=15 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=true +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index bb835eddb3aacef9fe4253dd710c015649573d16..46b312197f1231ac380e0c6699488fd0c9a1effc 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -1,6 +1,9 @@ package theodolite.uc4.application; -import org.apache.avro.specific.SpecificRecord; +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.Set; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -11,7 +14,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -21,38 +23,31 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.uc4.application.ConfigurationKeys; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.SensorParentKey; import theodolite.uc4.application.util.SensorParentKeySerializer; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; -import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; -import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; - -import java.io.IOException; -import java.time.Duration; -import java.util.Properties; -import java.util.Set; +import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; +import titan.ccp.model.sensorregistry.SensorRegistry; /** - * The Aggregation Microservice Flink Job. + * The Aggregation microservice implemented as a Flink job. */ public class AggregationServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private void run() { // Configurations @@ -64,12 +59,19 @@ public class AggregationServiceFlinkJob { final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); - final Time windowSize = Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); - final Duration windowGrace = Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); - final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); - final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); - final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); - final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); + final Time windowSize = + Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); + final Duration windowGrace = + Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); + final String configurationTopic = + this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); + final String stateBackend = + this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); + final String stateBackendPath = this.config + .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); + final int memoryStateBackendSize = + this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, + MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); @@ -79,11 +81,11 @@ public class AggregationServiceFlinkJob { // Sources and Sinks with Serializer and Deserializer - // Source from input topic with ActivePowerRecords + // Source from input topic with ActivePowerRecords final DeserializationSchema<ActivePowerRecord> inputSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>( inputTopic, inputSerde, kafkaProps); @@ -95,12 +97,13 @@ public class AggregationServiceFlinkJob { // Source from output topic with AggregatedPowerRecords final DeserializationSchema<AggregatedActivePowerRecord> outputSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - AggregatedActivePowerRecord.class, - schemaRegistryUrl); + ConfluentRegistryAvroDeserializationSchema.forSpecific( + AggregatedActivePowerRecord.class, + schemaRegistryUrl); - final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = new FlinkKafkaConsumer<>( - outputTopic, outputSerde, kafkaProps); + final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = + new FlinkKafkaConsumer<>( + outputTopic, outputSerde, kafkaProps); kafkaOutputSource.setStartFromGroupOffsets(); if (checkpointing) { @@ -113,8 +116,7 @@ public class AggregationServiceFlinkJob { configurationTopic, EventSerde::serde, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<Event, String>>() { - })); + TypeInformation.of(new TypeHint<Tuple2<Event, String>>() {})); final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = new FlinkKafkaConsumer<>( configurationTopic, configSerde, kafkaProps); @@ -124,13 +126,12 @@ public class AggregationServiceFlinkJob { } // Sink to output topic with SensorId, AggregatedActivePowerRecord - FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde = + final FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde = new FlinkKafkaKeyValueSerde<>( outputTopic, Serdes::String, () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), - TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() { - })); + TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {})); final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = new FlinkKafkaProducer<>( @@ -141,9 +142,11 @@ public class AggregationServiceFlinkJob { kafkaAggregationSink.setWriteTimestampToKafka(true); // Execution environment configuration -// org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); -// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); -// final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + // org.apache.flink.configuration.Configuration conf = new + // org.apache.flink.configuration.Configuration(); + // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + // final StreamExecutionEnvironment env = + // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -157,7 +160,7 @@ public class AggregationServiceFlinkJob { } else if (stateBackend.equals("rocksdb")) { try { env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true)); - } catch (IOException e) { + } catch (final IOException e) { e.printStackTrace(); } } else { @@ -165,15 +168,20 @@ public class AggregationServiceFlinkJob { } // Kryo serializer registration - env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer()); - env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, new SensorParentKeySerializer()); - - env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(), new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), new ImmutableSetSerializer()); - - env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> - LOGGER.info("Class " + c.getName() + " registered with serializer " + env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, + new ImmutableSensorRegistrySerializer()); + env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, + new SensorParentKeySerializer()); + + env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(), + new ImmutableSetSerializer()); + env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), + new ImmutableSetSerializer()); + env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), + new ImmutableSetSerializer()); + + env.getConfig().getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " + s.getSerializer().getClass().getName())); // Streaming topology @@ -200,12 +208,11 @@ public class AggregationServiceFlinkJob { mergedInputStream .map(new MapFunction<ActivePowerRecord, String>() { @Override - public String map(ActivePowerRecord value) throws Exception { - return - "ActivePowerRecord { " - + "identifier: " + value.getIdentifier() + ", " - + "timestamp: " + value.getTimestamp() + ", " - + "valueInW: " + value.getValueInW() + " }"; + public String map(final ActivePowerRecord value) throws Exception { + return "ActivePowerRecord { " + + "identifier: " + value.getIdentifier() + ", " + + "timestamp: " + value.getTimestamp() + ", " + + "valueInW: " + value.getValueInW() + " }"; } }) .name("[Map] toString") @@ -216,13 +223,14 @@ public class AggregationServiceFlinkJob { env.addSource(kafkaConfigSource) .name("[Kafka Consumer] Topic: " + configurationTopic) .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED - || tuple.f0 == Event.SENSOR_REGISTRY_STATUS).name("[Filter] SensorRegistry changed") + || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) + .name("[Filter] SensorRegistry changed") .map(new MapFunction<Tuple2<Event, String>, SensorRegistry>() { @Override - public SensorRegistry map(Tuple2<Event, String> tuple) { -// Gson gson = new GsonBuilder().setPrettyPrinting().create(); -// String prettyJsonString = gson.toJson(new JsonParser().parse(tuple.f1)); -// LOGGER.info("SensorRegistry: " + prettyJsonString); + public SensorRegistry map(final Tuple2<Event, String> tuple) { + // Gson gson = new GsonBuilder().setPrettyPrinting().create(); + // String prettyJsonString = gson.toJson(new JsonParser().parse(tuple.f1)); + // LOGGER.info("SensorRegistry: " + prettyJsonString); return SensorRegistry.fromJson(tuple.f1); } }).name("[Map] JSON -> SensorRegistry") @@ -230,34 +238,34 @@ public class AggregationServiceFlinkJob { .flatMap(new ChildParentsFlatMapFunction()) .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])"); - DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = + final DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = mergedInputStream.connect(configurationsStream) .keyBy(ActivePowerRecord::getIdentifier, - ((KeySelector<Tuple2<String, Set<String>>, String>) t -> (String) t.f0)) + (KeySelector<Tuple2<String, Set<String>>, String>) t -> t.f0) .flatMap(new JoinAndDuplicateCoFlatMapFunction()) .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)"); -// KeyedStream<ActivePowerRecord, String> keyedStream = -// mergedInputStream.keyBy(ActivePowerRecord::getIdentifier); -// -// MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = -// new MapStateDescriptor<>( -// "join-and-duplicate-state", -// BasicTypeInfo.STRING_TYPE_INFO, -// TypeInformation.of(new TypeHint<Set<String>>() {})); -// -// BroadcastStream<Tuple2<String, Set<String>>> broadcastStream = -// configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor); -// -// DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = -// keyedStream.connect(broadcastStream) -// .process(new JoinAndDuplicateKeyedBroadcastProcessFunction()); + // KeyedStream<ActivePowerRecord, String> keyedStream = + // mergedInputStream.keyBy(ActivePowerRecord::getIdentifier); + // + // MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = + // new MapStateDescriptor<>( + // "join-and-duplicate-state", + // BasicTypeInfo.STRING_TYPE_INFO, + // TypeInformation.of(new TypeHint<Set<String>>() {})); + // + // BroadcastStream<Tuple2<String, Set<String>>> broadcastStream = + // configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor); + // + // DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream = + // keyedStream.connect(broadcastStream) + // .process(new JoinAndDuplicateKeyedBroadcastProcessFunction()); if (debug) { lastValueStream .map(new MapFunction<Tuple2<SensorParentKey, ActivePowerRecord>, String>() { @Override - public String map(Tuple2<SensorParentKey, ActivePowerRecord> t) throws Exception { + public String map(final Tuple2<SensorParentKey, ActivePowerRecord> t) throws Exception { return "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {" + "identifier: " + t.f1.getIdentifier() + ", " + "timestamp: " + t.f1.getTimestamp() + ", " @@ -267,7 +275,7 @@ public class AggregationServiceFlinkJob { .print(); } - DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream + final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream .rebalance() .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace)) .keyBy(t -> t.f0.getParent()) @@ -277,45 +285,44 @@ public class AggregationServiceFlinkJob { // add Kafka Sink aggregationStream - .map(new MapFunction<AggregatedActivePowerRecord, Tuple2<String, AggregatedActivePowerRecord>>() { - @Override - public Tuple2<String, AggregatedActivePowerRecord> map(AggregatedActivePowerRecord value) throws Exception { - return new Tuple2<>(value.getIdentifier(), value); - } - }) + .map( + new MapFunction<AggregatedActivePowerRecord, Tuple2<String, AggregatedActivePowerRecord>>() { + @Override + public Tuple2<String, AggregatedActivePowerRecord> map( + final AggregatedActivePowerRecord value) throws Exception { + return new Tuple2<>(value.getIdentifier(), value); + } + }) .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); // add stdout sink if (debug) { - aggregationStream - .map(new MapFunction<AggregatedActivePowerRecord, String>() { - @Override - public String map(AggregatedActivePowerRecord value) throws Exception { - return - "AggregatedActivePowerRecord { " - + "identifier: " + value.getIdentifier() + ", " - + "timestamp: " + value.getTimestamp() + ", " - + "count: " + value.getCount() + ", " - + "sumInW: " + value.getSumInW() + ", " - + "avgInW: " + value.getAverageInW() + " }"; - } - }) - .name("[Map] toString") - .print(); + aggregationStream + .map(new MapFunction<AggregatedActivePowerRecord, String>() { + @Override + public String map(final AggregatedActivePowerRecord value) throws Exception { + return "AggregatedActivePowerRecord { " + + "identifier: " + value.getIdentifier() + ", " + + "timestamp: " + value.getTimestamp() + ", " + + "count: " + value.getCount() + ", " + + "sumInW: " + value.getSumInW() + ", " + + "avgInW: " + value.getAverageInW() + " }"; + } + }) + .name("[Map] toString") + .print(); } // Execution plan - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Execution Plan:\n" + env.getExecutionPlan()); - } + LOGGER.info("Execution plan: {}", env.getExecutionPlan()); // Execute Job try { env.execute(applicationId); - } catch (Exception e) { - e.printStackTrace(); + } catch (final Exception e) { // NOPMD Execution thrown by Flink + LOGGER.error("An error occured while running this job.", e); } } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java index 8cfbaddc9440074d6e2ab0bd47b07a179ffa86ac..a7e5ac28ff08c17bbd7911c02cde7bee1316c823 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java @@ -26,7 +26,7 @@ import java.util.stream.Stream; */ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> { - private static final long serialVersionUID = 3969444219510915221L; + private static final long serialVersionUID = 3969444219510915221L; //NOPMD private transient MapState<String, Set<String>> state; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java index d0682a0fa4c91c8bb5257db45f03f96ff95ad296..6497f6b055ef115c4a681499c5fa38657bb5d29e 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java @@ -28,7 +28,8 @@ public final class ConfigurationKeys { public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size"; + public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = //NOPMD + "flink.state.backend.memory.size"; public static final String DEBUG = "debug"; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java index ceeb46e640734525ce37acbd41dbe2137f046803..dec7b417b2683f95f363547fd4f76acf49195c4d 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java @@ -1,5 +1,6 @@ package theodolite.uc4.application; +import java.util.Set; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -8,44 +9,46 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.Collector; - import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; -import java.util.Set; +public class JoinAndDuplicateCoFlatMapFunction extends + RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS -public class JoinAndDuplicateCoFlatMapFunction extends RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { - - private static final long serialVersionUID = -6992783644887835979L; + private static final long serialVersionUID = -6992783644887835979L; // NOPMD private transient MapState<String, Set<String>> state; @Override - public void open(Configuration parameters) throws Exception { - MapStateDescriptor<String, Set<String>> descriptor = - new MapStateDescriptor<String, Set<String>>( + public void open(final Configuration parameters) throws Exception { + final MapStateDescriptor<String, Set<String>> descriptor = + new MapStateDescriptor<>( "join-and-duplicate-state", - TypeInformation.of(new TypeHint<String>(){}), - TypeInformation.of(new TypeHint<Set<String>>(){})); - this.state = getRuntimeContext().getMapState(descriptor); + TypeInformation.of(new TypeHint<String>() {}), + TypeInformation.of(new TypeHint<Set<String>>() {})); + this.state = this.getRuntimeContext().getMapState(descriptor); } @Override - public void flatMap1(ActivePowerRecord value, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - Set<String> parents = this.state.get(value.getIdentifier()); - if (parents == null) return; - for (String parent : parents) { + public void flatMap1(final ActivePowerRecord value, + final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { + final Set<String> parents = this.state.get(value.getIdentifier()); + if (parents == null) { + return; + } + for (final String parent : parents) { out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value)); } } @Override - public void flatMap2(Tuple2<String, Set<String>> value, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - Set<String> oldParents = this.state.get(value.f0); + public void flatMap2(final Tuple2<String, Set<String>> value, + final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { + final Set<String> oldParents = this.state.get(value.f0); if (oldParents != null) { - Set<String> newParents = value.f1; + final Set<String> newParents = value.f1; if (!newParents.equals(oldParents)) { - for (String oldParent : oldParents) { + for (final String oldParent : oldParents) { if (!newParents.contains(oldParent)) { out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java index 8e4f965b4b1d2e30dea60d7211e737616927b264..96711b2f09ad9fd6b0b2b3f98687acbf2e4c8c68 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java @@ -1,5 +1,6 @@ package theodolite.uc4.application; +import java.util.Set; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -8,15 +9,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector; - import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; -import java.util.Set; +public class JoinAndDuplicateKeyedBroadcastProcessFunction extends + KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS -public class JoinAndDuplicateKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { - - private static final long serialVersionUID = -4525438547262992821L; + private static final long serialVersionUID = -4525438547262992821L; // NOPMD private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor = new MapStateDescriptor<>( @@ -25,22 +24,28 @@ public class JoinAndDuplicateKeyedBroadcastProcessFunction extends KeyedBroadcas TypeInformation.of(new TypeHint<Set<String>>() {})); @Override - public void processElement(ActivePowerRecord value, ReadOnlyContext ctx, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - Set<String> parents = ctx.getBroadcastState(sensorConfigStateDescriptor).get(value.getIdentifier()); - if (parents == null) return; - for (String parent : parents) { + public void processElement(final ActivePowerRecord value, final ReadOnlyContext ctx, + final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { + final Set<String> parents = + ctx.getBroadcastState(this.sensorConfigStateDescriptor).get(value.getIdentifier()); + if (parents == null) { + return; + } + for (final String parent : parents) { out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value)); } } @Override - public void processBroadcastElement(Tuple2<String, Set<String>> value, Context ctx, Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { - BroadcastState<String, Set<String>> state = ctx.getBroadcastState(sensorConfigStateDescriptor); - Set<String> oldParents = state.get(value.f0); + public void processBroadcastElement(final Tuple2<String, Set<String>> value, final Context ctx, + final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception { + final BroadcastState<String, Set<String>> state = + ctx.getBroadcastState(this.sensorConfigStateDescriptor); + final Set<String> oldParents = state.get(value.f0); if (oldParents != null) { - Set<String> newParents = value.f1; + final Set<String> newParents = value.f1; if (!newParents.equals(oldParents)) { - for (String oldParent : oldParents) { + for (final String oldParent : oldParents) { if (!newParents.contains(oldParent)) { out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null)); } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java index ea21899f656758b93f2032634671b7cd2b285c0e..e4b545174861a70a49a0955f95b5bd7e14b2dfb6 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java @@ -7,43 +7,43 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; - import theodolite.uc4.application.util.SensorParentKey; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; -public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { +public class RecordAggregationProcessWindowFunction extends + ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { // NOCS - private static final long serialVersionUID = 6030159552332624435L; + private static final long serialVersionUID = 6030159552332624435L; // NOPMD private transient MapState<SensorParentKey, ActivePowerRecord> lastValueState; private transient ValueState<AggregatedActivePowerRecord> aggregateState; @Override - public void open(org.apache.flink.configuration.Configuration parameters) { + public void open(final Configuration parameters) { final MapStateDescriptor<SensorParentKey, ActivePowerRecord> lastValueStateDescriptor = - new MapStateDescriptor<SensorParentKey, ActivePowerRecord>( + new MapStateDescriptor<>( "last-value-state", - TypeInformation.of(new TypeHint<SensorParentKey>() { - }), - TypeInformation.of(new TypeHint<ActivePowerRecord>() { - })); - this.lastValueState = getRuntimeContext().getMapState(lastValueStateDescriptor); + TypeInformation.of(new TypeHint<SensorParentKey>() {}), + TypeInformation.of(new TypeHint<ActivePowerRecord>() {})); + this.lastValueState = this.getRuntimeContext().getMapState(lastValueStateDescriptor); final ValueStateDescriptor<AggregatedActivePowerRecord> aggregateStateDescriptor = - new ValueStateDescriptor<AggregatedActivePowerRecord>( + new ValueStateDescriptor<>( "aggregation-state", - TypeInformation.of(new TypeHint<AggregatedActivePowerRecord>() { - })); - this.aggregateState = getRuntimeContext().getState(aggregateStateDescriptor); + TypeInformation.of(new TypeHint<AggregatedActivePowerRecord>() {})); + this.aggregateState = this.getRuntimeContext().getState(aggregateStateDescriptor); } @Override - public void process(String key, Context context, Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements, Collector<AggregatedActivePowerRecord> out) throws Exception { - for (Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { + public void process(final String key, final Context context, + final Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements, + final Collector<AggregatedActivePowerRecord> out) throws Exception { + for (final Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { AggregatedActivePowerRecord currentAggregate = this.aggregateState.value(); if (currentAggregate == null) { currentAggregate = new AggregatedActivePowerRecord(key, 0L, 0L, 0.0, 0.0); @@ -71,17 +71,18 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio } // prefer newer timestamp, but use previous if 0 -> sensor was deleted - long timestamp = newRecord.getTimestamp() == 0 ? previousRecord.getTimestamp() : newRecord.getTimestamp(); - double sumInW = currentAggregate.getSumInW() - previousRecord.getValueInW() + newRecord.getValueInW(); - double avgInW = count == 0 ? 0 : sumInW / count; + final long timestamp = + newRecord.getTimestamp() == 0 ? previousRecord.getTimestamp() : newRecord.getTimestamp(); + final double sumInW = + currentAggregate.getSumInW() - previousRecord.getValueInW() + newRecord.getValueInW(); + final double avgInW = count == 0 ? 0 : sumInW / count; - AggregatedActivePowerRecord newAggregate = new AggregatedActivePowerRecord( + final AggregatedActivePowerRecord newAggregate = new AggregatedActivePowerRecord( sensorParentKey.getParent(), timestamp, count, sumInW, - avgInW - ); + avgInW); // update state and aggregateState this.lastValueState.put(sensorParentKey, newRecord); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java index 0de9a500c0ab3d928eabe4ea452fef066c5dfb8d..927006fabd56ab6c24532cd71184e6f7c20094ac 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java @@ -4,21 +4,22 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; - import java.io.Serializable; +import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; -public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry> implements Serializable { +public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry> + implements Serializable { - private static final long serialVersionUID = 1806411056006113017L; + private static final long serialVersionUID = 1806411056006113017L; // NOPMD @Override - public void write(Kryo kryo, Output output, ImmutableSensorRegistry object) { + public void write(final Kryo kryo, final Output output, final ImmutableSensorRegistry object) { output.writeString(object.toJson()); } @Override - public ImmutableSensorRegistry read(Kryo kryo, Input input, Class<ImmutableSensorRegistry> type) { + public ImmutableSensorRegistry read(final Kryo kryo, final Input input, + final Class<ImmutableSensorRegistry> type) { return (ImmutableSensorRegistry) ImmutableSensorRegistry.fromJson(input.readString()); } } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java index c3117051ff3945f4133e3399bb29fda25d217954..913b07aaafcaad3b5247fd4bf13ec64df3469312 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java @@ -4,18 +4,19 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import java.io.Serializable; import java.util.Set; public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable { + private static final long serialVersionUID = 6919877826110724620L; // NOPMD + public ImmutableSetSerializer() { super(false, true); } @Override - public void write(Kryo kryo, Output output, Set<Object> object) { + public void write(final Kryo kryo, final Output output, final Set<Object> object) { output.writeInt(object.size(), true); for (final Object elm : object) { kryo.writeClassAndObject(output, elm); @@ -23,7 +24,7 @@ public final class ImmutableSetSerializer extends Serializer<Set<Object>> implem } @Override - public Set<Object> read(Kryo kryo, Input input, Class<Set<Object>> type) { + public Set<Object> read(final Kryo kryo, final Input input, final Class<Set<Object>> type) { final int size = input.readInt(true); final Object[] list = new Object[size]; for (int i = 0; i < size; ++i) { @@ -33,15 +34,15 @@ public final class ImmutableSetSerializer extends Serializer<Set<Object>> implem } /** - * Creates a new {@link ImmutableSetSerializer} and registers its serializer - * for the several related classes + * Creates a new {@link ImmutableSetSerializer} and registers its serializer for the several + * related classes. * * @param kryo the {@link Kryo} instance to set the serializer on */ - public static void registerSerializers(Kryo kryo) { + public static void registerSerializers(final Kryo kryo) { final ImmutableSetSerializer serializer = new ImmutableSetSerializer(); kryo.register(Set.of().getClass(), serializer); kryo.register(Set.of(1).getClass(), serializer); - kryo.register(Set.of(1, 2, 3, 4).getClass(), serializer); + kryo.register(Set.of(1, 2, 3, 4).getClass(), serializer); // NOCS } -} \ No newline at end of file +} diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java index 657918a1c91e930a898c5a0728ecf3eff0d0a106..903b66dd12a2864d522fde7eb7cf3fdc2ec73bcd 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKey.java @@ -31,15 +31,20 @@ public class SensorParentKey { @Override public int hashCode() { - return Objects.hash(sensorIdentifier, parentIdentifier); + return Objects.hash(this.sensorIdentifier, this.parentIdentifier); } @Override - public boolean equals(Object obj) { - if (obj == this) return true; - if (!(obj instanceof SensorParentKey)) return false; + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof SensorParentKey)) { + return false; + } final SensorParentKey k = (SensorParentKey) obj; - return sensorIdentifier.equals(k.sensorIdentifier) && parentIdentifier.equals(k.parentIdentifier); + return this.sensorIdentifier.equals(k.sensorIdentifier) + && this.parentIdentifier.equals(k.parentIdentifier); } diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java index ad63f9c3b45943f86be30a411d00fc1179410710..bdd403a05de8f54f636568e839f5f48effd43d58 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/SensorParentKeySerializer.java @@ -4,22 +4,25 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import java.io.Serializable; /** * Kryo serializer for {@link SensorParentKey}. */ -public final class SensorParentKeySerializer extends Serializer<SensorParentKey> implements Serializable { +public final class SensorParentKeySerializer extends Serializer<SensorParentKey> + implements Serializable { + + private static final long serialVersionUID = -867781963471414857L; // NOPMD @Override - public void write(Kryo kryo, Output output, SensorParentKey object) { + public void write(final Kryo kryo, final Output output, final SensorParentKey object) { output.writeString(object.getSensor()); output.writeString(object.getParent()); } @Override - public SensorParentKey read(Kryo kryo, Input input, Class<SensorParentKey> type) { + public SensorParentKey read(final Kryo kryo, final Input input, + final Class<SensorParentKey> type) { final String sensor = input.readString(); final String parent = input.readString(); return new SensorParentKey(sensor, parent);