From 373008173a8a800571d2cd84a79ba797fb3e61ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 13 Jan 2022 13:44:22 +0100 Subject: [PATCH] Fix code format --- .../java/application/Uc2BeamPipeline.java | 12 +- .../.settings/org.eclipse.jdt.ui.prefs | 283 +++++++++++++++++ .../java/application/Uc3BeamPipeline.java | 53 ++-- .../.settings/org.eclipse.jdt.ui.prefs | 284 ++++++++++++++++++ .../java/application/Uc4BeamPipeline.java | 61 ++-- 5 files changed, 614 insertions(+), 79 deletions(-) create mode 100644 theodolite-benchmarks/uc3-beam/.settings/org.eclipse.jdt.ui.prefs create mode 100644 theodolite-benchmarks/uc4-beam/.settings/org.eclipse.jdt.ui.prefs diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index 1947917d7..02eec9868 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -25,12 +25,7 @@ import titan.ccp.model.records.ActivePowerRecord; /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of the use case Downsampling using Apache Beam. */ public final class Uc2BeamPipeline extends AbstractPipeline { @@ -39,9 +34,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline { // Additional needed variables final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final int windowDurationMinutes = Integer.parseInt( - config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); - final Duration duration = Duration.standardMinutes(windowDurationMinutes); + final Duration duration = + Duration.standardMinutes(config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); // Build kafka configuration final Map<String, Object> consumerConfig = buildConsumerConfig(); diff --git a/theodolite-benchmarks/uc3-beam/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-beam/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 000000000..8527c47c1 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,283 @@ +cleanup.add_all=false +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.array_with_curly=false +cleanup.arrays_fill=false +cleanup.bitwise_conditional_expression=false +cleanup.boolean_literal=false +cleanup.boolean_value_rather_than_comparison=true +cleanup.break_loop=false +cleanup.collection_cloning=false +cleanup.comparing_on_criteria=false +cleanup.comparison_statement=false +cleanup.controlflow_merge=false +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.convert_to_enhanced_for_loop_if_loop_var_used=true +cleanup.convert_to_switch_expressions=false +cleanup.correct_indentation=true +cleanup.do_while_rather_than_while=true +cleanup.double_negation=false +cleanup.else_if=false +cleanup.embedded_if=false +cleanup.evaluate_nullable=false +cleanup.extract_increment=false +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.hash=false +cleanup.if_condition=false +cleanup.insert_inferred_type_arguments=false +cleanup.instanceof=false +cleanup.instanceof_keyword=false +cleanup.invert_equals=false +cleanup.join=false +cleanup.lazy_logical_operator=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.map_cloning=false +cleanup.merge_conditional_blocks=false +cleanup.multi_catch=false +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.no_string_creation=false +cleanup.no_super=false +cleanup.number_suffix=false +cleanup.objects_equals=false +cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=true +cleanup.operand_factorization=false +cleanup.organize_imports=true +cleanup.overridden_assignment=false +cleanup.plain_replacement=false +cleanup.precompile_regex=false +cleanup.primitive_comparison=false +cleanup.primitive_parsing=false +cleanup.primitive_rather_than_wrapper=true +cleanup.primitive_serialization=false +cleanup.pull_out_if_from_if_else=false +cleanup.pull_up_assignment=false +cleanup.push_down_negation=false +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.reduce_indentation=false +cleanup.redundant_comparator=false +cleanup.redundant_falling_through_block_end=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=true +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_array_creation=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.return_expression=false +cleanup.simplify_lambda_expression_and_method_ref=false +cleanup.single_used_field=false +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.standard_comparison=false +cleanup.static_inner_class=false +cleanup.strictly_equal_or_different=false +cleanup.stringbuffer_to_stringbuilder=false +cleanup.stringbuilder=false +cleanup.stringbuilder_for_local_vars=true +cleanup.substring=false +cleanup.switch=false +cleanup.system_property=false +cleanup.system_property_boolean=false +cleanup.system_property_file_encoding=false +cleanup.system_property_file_separator=false +cleanup.system_property_line_separator=false +cleanup.system_property_path_separator=false +cleanup.ternary_operator=false +cleanup.try_with_resource=false +cleanup.unlooped_while=false +cleanup.unreachable_block=false +cleanup.use_anonymous_class_creation=false +cleanup.use_autoboxing=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_directly_map_method=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_string_is_blank=false +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup.use_unboxing=false +cleanup.use_var=false +cleanup.useless_continue=false +cleanup.useless_return=false +cleanup.valueof_rather_than_instantiation=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=21 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;org;com; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_all=false +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.array_with_curly=false +sp_cleanup.arrays_fill=false +sp_cleanup.bitwise_conditional_expression=false +sp_cleanup.boolean_literal=false +sp_cleanup.boolean_value_rather_than_comparison=false +sp_cleanup.break_loop=false +sp_cleanup.collection_cloning=false +sp_cleanup.comparing_on_criteria=false +sp_cleanup.comparison_statement=false +sp_cleanup.controlflow_merge=false +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=false +sp_cleanup.convert_to_enhanced_for_loop_if_loop_var_used=false +sp_cleanup.convert_to_switch_expressions=false +sp_cleanup.correct_indentation=true +sp_cleanup.do_while_rather_than_while=false +sp_cleanup.double_negation=false +sp_cleanup.else_if=false +sp_cleanup.embedded_if=false +sp_cleanup.evaluate_nullable=false +sp_cleanup.extract_increment=false +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.hash=false +sp_cleanup.if_condition=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.instanceof=false +sp_cleanup.instanceof_keyword=false +sp_cleanup.invert_equals=false +sp_cleanup.join=false +sp_cleanup.lazy_logical_operator=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=false +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.map_cloning=false +sp_cleanup.merge_conditional_blocks=false +sp_cleanup.multi_catch=false +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.no_string_creation=false +sp_cleanup.no_super=false +sp_cleanup.number_suffix=false +sp_cleanup.objects_equals=false +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=false +sp_cleanup.operand_factorization=false +sp_cleanup.organize_imports=true +sp_cleanup.overridden_assignment=false +sp_cleanup.plain_replacement=false +sp_cleanup.precompile_regex=false +sp_cleanup.primitive_comparison=false +sp_cleanup.primitive_parsing=false +sp_cleanup.primitive_rather_than_wrapper=false +sp_cleanup.primitive_serialization=false +sp_cleanup.pull_out_if_from_if_else=false +sp_cleanup.pull_up_assignment=false +sp_cleanup.push_down_negation=false +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.reduce_indentation=false +sp_cleanup.redundant_comparator=false +sp_cleanup.redundant_falling_through_block_end=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_array_creation=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=false +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.return_expression=false +sp_cleanup.simplify_lambda_expression_and_method_ref=false +sp_cleanup.single_used_field=false +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.standard_comparison=false +sp_cleanup.static_inner_class=false +sp_cleanup.strictly_equal_or_different=false +sp_cleanup.stringbuffer_to_stringbuilder=false +sp_cleanup.stringbuilder=false +sp_cleanup.stringbuilder_for_local_vars=true +sp_cleanup.substring=false +sp_cleanup.switch=false +sp_cleanup.system_property=false +sp_cleanup.system_property_boolean=false +sp_cleanup.system_property_file_encoding=false +sp_cleanup.system_property_file_separator=false +sp_cleanup.system_property_line_separator=false +sp_cleanup.system_property_path_separator=false +sp_cleanup.ternary_operator=false +sp_cleanup.try_with_resource=true +sp_cleanup.unlooped_while=false +sp_cleanup.unreachable_block=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_autoboxing=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_directly_map_method=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_string_is_blank=false +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false +sp_cleanup.use_unboxing=false +sp_cleanup.use_var=false +sp_cleanup.useless_continue=false +sp_cleanup.useless_return=false +sp_cleanup.valueof_rather_than_instantiation=false diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index 7424a19fe..c231c020e 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -1,7 +1,5 @@ package application; -import com.google.common.math.Stats; -import com.google.common.math.StatsAccumulator; import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -18,6 +16,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; @@ -26,12 +26,7 @@ import titan.ccp.model.records.ActivePowerRecord; /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of the use case Aggregation based on Time Attributes using Apache Beam. */ public final class Uc3BeamPipeline extends AbstractPipeline { @@ -40,46 +35,41 @@ public final class Uc3BeamPipeline extends AbstractPipeline { // Additional needed variables final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final int windowDurationDays = Integer.parseInt( - config.getString(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); - final Duration duration = Duration.standardDays(windowDurationDays); - - final int aggregationAdvance = Integer.parseInt( - config.getString(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); - final Duration aggregationAdvanceDuration = Duration.standardDays(aggregationAdvance); - - final int triggerInterval = Integer.parseInt( - config.getString(ConfigurationKeys.TRIGGER_INTERVAL)); - final Duration triggerDelay = Duration.standardSeconds(triggerInterval); + final Duration duration = + Duration.standardDays(config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + final Duration aggregationAdvanceDuration = + Duration.standardDays(config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + final Duration triggerDelay = + Duration.standardSeconds(config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); - // Build kafka configuration - final Map consumerConfig = buildConsumerConfig(); + // Build Kafka configuration + final Map<String, Object> consumerConfig = this.buildConsumerConfig(); - // Set Coders for Classes that will be distributed + // Set Coders for classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); registerCoders(cr); // Read from Kafka final KafkaActivePowerTimestampReader kafka = - new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); + new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); // Map the time format final MapTimeFormat mapTimeFormat = new MapTimeFormat(); - // get the stats per HourOfDay + // Get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); // Write to Kafka final KafkaWriterTransformation<String> kafkaWriter = - new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); + new KafkaWriterTransformation<>(this.bootstrapServer, outputTopic, StringSerializer.class); this.apply(kafka) // Map to correct time format .apply(MapElements.via(mapTimeFormat)) // Apply a sliding window .apply(Window - .<KV<HourOfDayKey, ActivePowerRecord>> - into(SlidingWindows.of(duration).every(aggregationAdvanceDuration)) + .<KV<HourOfDayKey, ActivePowerRecord>>into( + SlidingWindows.of(duration).every(aggregationAdvanceDuration)) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) @@ -87,13 +77,11 @@ public final class Uc3BeamPipeline extends AbstractPipeline { .accumulatingFiredPanes()) // Aggregate per window for every key - .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey( - new StatsAggregation())) + .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(new StatsAggregation())) .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class))) // Map into correct output format - .apply(MapElements - .via(hourOfDayWithStats)) + .apply(MapElements.via(hourOfDayWithStats)) // Write to Kafka .apply(kafkaWriter); } @@ -107,8 +95,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { private static void registerCoders(final CoderRegistry cr) { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder()); - cr.registerCoderForClass(StatsAggregation.class, - SerializableCoder.of(StatsAggregation.class)); + cr.registerCoderForClass(StatsAggregation.class, SerializableCoder.of(StatsAggregation.class)); cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); } } diff --git a/theodolite-benchmarks/uc4-beam/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-beam/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 000000000..f48c8c940 --- /dev/null +++ b/theodolite-benchmarks/uc4-beam/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,284 @@ +cleanup.add_all=false +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.array_with_curly=false +cleanup.arrays_fill=false +cleanup.bitwise_conditional_expression=false +cleanup.boolean_literal=false +cleanup.boolean_value_rather_than_comparison=true +cleanup.break_loop=false +cleanup.collection_cloning=false +cleanup.comparing_on_criteria=false +cleanup.comparison_statement=false +cleanup.controlflow_merge=false +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.convert_to_enhanced_for_loop_if_loop_var_used=true +cleanup.convert_to_switch_expressions=false +cleanup.correct_indentation=true +cleanup.do_while_rather_than_while=true +cleanup.double_negation=false +cleanup.else_if=false +cleanup.embedded_if=false +cleanup.evaluate_nullable=false +cleanup.extract_increment=false +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.hash=false +cleanup.if_condition=false +cleanup.insert_inferred_type_arguments=false +cleanup.instanceof=false +cleanup.instanceof_keyword=false +cleanup.invert_equals=false +cleanup.join=false +cleanup.lazy_logical_operator=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.map_cloning=false +cleanup.merge_conditional_blocks=false +cleanup.multi_catch=false +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.no_string_creation=false +cleanup.no_super=false +cleanup.number_suffix=false +cleanup.objects_equals=false +cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=true +cleanup.operand_factorization=false +cleanup.organize_imports=true +cleanup.overridden_assignment=false +cleanup.plain_replacement=false +cleanup.precompile_regex=false +cleanup.primitive_comparison=false +cleanup.primitive_parsing=false +cleanup.primitive_rather_than_wrapper=true +cleanup.primitive_serialization=false +cleanup.pull_out_if_from_if_else=false +cleanup.pull_up_assignment=false +cleanup.push_down_negation=false +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.reduce_indentation=false +cleanup.redundant_comparator=false +cleanup.redundant_falling_through_block_end=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=true +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_array_creation=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.return_expression=false +cleanup.simplify_lambda_expression_and_method_ref=false +cleanup.single_used_field=false +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.standard_comparison=false +cleanup.static_inner_class=false +cleanup.strictly_equal_or_different=false +cleanup.stringbuffer_to_stringbuilder=false +cleanup.stringbuilder=false +cleanup.stringbuilder_for_local_vars=true +cleanup.substring=false +cleanup.switch=false +cleanup.system_property=false +cleanup.system_property_boolean=false +cleanup.system_property_file_encoding=false +cleanup.system_property_file_separator=false +cleanup.system_property_line_separator=false +cleanup.system_property_path_separator=false +cleanup.ternary_operator=false +cleanup.try_with_resource=false +cleanup.unlooped_while=false +cleanup.unreachable_block=false +cleanup.use_anonymous_class_creation=false +cleanup.use_autoboxing=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_directly_map_method=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_string_is_blank=false +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup.use_unboxing=false +cleanup.use_var=false +cleanup.useless_continue=false +cleanup.useless_return=false +cleanup.valueof_rather_than_instantiation=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=21 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;org;com; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= +sp_cleanup.add_all=false +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.array_with_curly=false +sp_cleanup.arrays_fill=false +sp_cleanup.bitwise_conditional_expression=false +sp_cleanup.boolean_literal=false +sp_cleanup.boolean_value_rather_than_comparison=false +sp_cleanup.break_loop=false +sp_cleanup.collection_cloning=false +sp_cleanup.comparing_on_criteria=false +sp_cleanup.comparison_statement=false +sp_cleanup.controlflow_merge=false +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=false +sp_cleanup.convert_to_enhanced_for_loop_if_loop_var_used=false +sp_cleanup.convert_to_switch_expressions=false +sp_cleanup.correct_indentation=true +sp_cleanup.do_while_rather_than_while=false +sp_cleanup.double_negation=false +sp_cleanup.else_if=false +sp_cleanup.embedded_if=false +sp_cleanup.evaluate_nullable=false +sp_cleanup.extract_increment=false +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.hash=false +sp_cleanup.if_condition=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.instanceof=false +sp_cleanup.instanceof_keyword=false +sp_cleanup.invert_equals=false +sp_cleanup.join=false +sp_cleanup.lazy_logical_operator=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=false +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.map_cloning=false +sp_cleanup.merge_conditional_blocks=false +sp_cleanup.multi_catch=false +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.no_string_creation=false +sp_cleanup.no_super=false +sp_cleanup.number_suffix=false +sp_cleanup.objects_equals=false +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=false +sp_cleanup.operand_factorization=false +sp_cleanup.organize_imports=true +sp_cleanup.overridden_assignment=false +sp_cleanup.plain_replacement=false +sp_cleanup.precompile_regex=false +sp_cleanup.primitive_comparison=false +sp_cleanup.primitive_parsing=false +sp_cleanup.primitive_rather_than_wrapper=false +sp_cleanup.primitive_serialization=false +sp_cleanup.pull_out_if_from_if_else=false +sp_cleanup.pull_up_assignment=false +sp_cleanup.push_down_negation=false +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.reduce_indentation=false +sp_cleanup.redundant_comparator=false +sp_cleanup.redundant_falling_through_block_end=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_type_arguments=false +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_array_creation=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=false +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.return_expression=false +sp_cleanup.simplify_lambda_expression_and_method_ref=false +sp_cleanup.single_used_field=false +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.standard_comparison=false +sp_cleanup.static_inner_class=false +sp_cleanup.strictly_equal_or_different=false +sp_cleanup.stringbuffer_to_stringbuilder=false +sp_cleanup.stringbuilder=false +sp_cleanup.stringbuilder_for_local_vars=true +sp_cleanup.substring=false +sp_cleanup.switch=false +sp_cleanup.system_property=false +sp_cleanup.system_property_boolean=false +sp_cleanup.system_property_file_encoding=false +sp_cleanup.system_property_file_separator=false +sp_cleanup.system_property_line_separator=false +sp_cleanup.system_property_path_separator=false +sp_cleanup.ternary_operator=false +sp_cleanup.try_with_resource=false +sp_cleanup.unlooped_while=false +sp_cleanup.unreachable_block=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_autoboxing=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_directly_map_method=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_string_is_blank=false +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false +sp_cleanup.use_unboxing=false +sp_cleanup.use_var=false +sp_cleanup.useless_continue=false +sp_cleanup.useless_return=false +sp_cleanup.valueof_rather_than_instantiation=false diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index 78083db12..df436366f 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -1,6 +1,5 @@ -package application; //NOPMD +package application; // NOPMD -import com.google.common.math.StatsAccumulator; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -31,6 +30,7 @@ import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.joda.time.Duration; +import com.google.common.math.StatsAccumulator; import serialization.AggregatedActivePowerRecordCoder; import serialization.AggregatedActivePowerRecordDeserializer; import serialization.AggregatedActivePowerRecordSerializer; @@ -47,16 +47,11 @@ import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST address - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of the use case Hierarchical Aggregation using Apache Beam. */ public final class Uc4BeamPipeline extends AbstractPipeline { - protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) { //NOPMD + protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) { // NOPMD super(options, config); // Additional needed variables @@ -64,21 +59,16 @@ public final class Uc4BeamPipeline extends AbstractPipeline { final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String configurationTopic = config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); - final int windowDurationMinutes = Integer.parseInt( - config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); - final Duration duration = Duration.standardSeconds(windowDurationMinutes); - - final int triggerInterval = Integer.parseInt( - config.getString(ConfigurationKeys.TRIGGER_INTERVAL)); - final Duration triggerDelay = Duration.standardSeconds(triggerInterval); - - final int grace = Integer.parseInt( - config.getString(ConfigurationKeys.GRACE_PERIOD_MS)); - final Duration gracePeriod = Duration.standardSeconds(grace); + final Duration duration = + Duration.standardSeconds(config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + final Duration triggerDelay = + Duration.standardSeconds(config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + final Duration gracePeriod = + Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); // Build kafka configuration - final Map<String, Object> consumerConfig = buildConsumerConfig(); - final Map<String, Object> configurationConfig = configurationConfig(config); + final Map<String, Object> consumerConfig = this.buildConsumerConfig(); + final Map<String, Object> configurationConfig = this.configurationConfig(config); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); @@ -86,15 +76,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Read from Kafka // ActivePowerRecords - final KafkaActivePowerTimestampReader - kafkaActivePowerRecordReader = - new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = + new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); - //Configuration Events - final KafkaGenericReader<Event, String> - kafkaConfigurationReader = + // Configuration Events + final KafkaGenericReader<Event, String> kafkaConfigurationReader = new KafkaGenericReader<>( - bootstrapServer, configurationTopic, configurationConfig, + this.bootstrapServer, configurationTopic, configurationConfig, EventDeserializer.class, StringDeserializer.class); // Transform into AggregatedActivePowerRecords into ActivePowerRecords @@ -103,11 +91,11 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Write to Kafka final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput = new KafkaWriterTransformation<>( - bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class); + this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class); final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback = new KafkaWriterTransformation<>( - bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class); + this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class); // Apply pipeline transformations final PCollection<KV<String, ActivePowerRecord>> values = this @@ -124,7 +112,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Read the results of earlier aggregations. final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = this .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read() - .withBootstrapServers(bootstrapServer) + .withBootstrapServers(this.bootstrapServer) .withTopic(feedbackTopic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class) @@ -187,9 +175,9 @@ public final class Uc4BeamPipeline extends AbstractPipeline { // Build pairs of every sensor reading and parent final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = inputCollection.apply( - "Duplicate as flatMap", - ParDo.of(new DuplicateAsFlatMap(childParentPairMap)) - .withSideInputs(childParentPairMap)) + "Duplicate as flatMap", + ParDo.of(new DuplicateAsFlatMap(childParentPairMap)) + .withSideInputs(childParentPairMap)) .apply("Filter only latest changes", Latest.perKey()) .apply("Filter out null values", Filter.by(filterNullValues)); @@ -198,8 +186,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { final SetKeyToGroup setKeyToGroup = new SetKeyToGroup(); // Aggregate for every sensor group of the current level - final PCollection<KV<String, AggregatedActivePowerRecord>> - aggregations = flatMappedValues + final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues .apply("Set key to group", MapElements.via(setKeyToGroup)) // Reset trigger to avoid synchronized processing time .apply("Reset trigger for aggregations", Window -- GitLab