Skip to content
Snippets Groups Projects
Commit befcf2e1 authored by Sören Henning's avatar Sören Henning
Browse files

Clean up code

parent 8d2dd54a
Branches
Tags
1 merge request!187Migrate Beam benchmark implementation
Pipeline #5943 passed
Showing
with 305 additions and 23 deletions
cleanup.add_all=false
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.array_with_curly=false
cleanup.arrays_fill=false
cleanup.bitwise_conditional_expression=false
cleanup.boolean_literal=false
cleanup.boolean_value_rather_than_comparison=true
cleanup.break_loop=false
cleanup.collection_cloning=false
cleanup.comparing_on_criteria=false
cleanup.comparison_statement=false
cleanup.controlflow_merge=false
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.convert_to_enhanced_for_loop_if_loop_var_used=true
cleanup.convert_to_switch_expressions=false
cleanup.correct_indentation=true
cleanup.do_while_rather_than_while=true
cleanup.double_negation=false
cleanup.else_if=false
cleanup.embedded_if=false
cleanup.evaluate_nullable=false
cleanup.extract_increment=false
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.hash=false
cleanup.if_condition=false
cleanup.insert_inferred_type_arguments=false
cleanup.instanceof=false
cleanup.instanceof_keyword=false
cleanup.invert_equals=false
cleanup.join=false
cleanup.lazy_logical_operator=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.map_cloning=false
cleanup.merge_conditional_blocks=false
cleanup.multi_catch=false
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.no_string_creation=false
cleanup.no_super=false
cleanup.number_suffix=false
cleanup.objects_equals=false
cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=true
cleanup.operand_factorization=false
cleanup.organize_imports=true
cleanup.overridden_assignment=false
cleanup.plain_replacement=false
cleanup.precompile_regex=false
cleanup.primitive_comparison=false
cleanup.primitive_parsing=false
cleanup.primitive_rather_than_wrapper=true
cleanup.primitive_serialization=false
cleanup.pull_out_if_from_if_else=false
cleanup.pull_up_assignment=false
cleanup.push_down_negation=false
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.reduce_indentation=false
cleanup.redundant_comparator=false
cleanup.redundant_falling_through_block_end=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_array_creation=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.return_expression=false
cleanup.simplify_lambda_expression_and_method_ref=false
cleanup.single_used_field=false
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.standard_comparison=false
cleanup.static_inner_class=false
cleanup.strictly_equal_or_different=false
cleanup.stringbuffer_to_stringbuilder=false
cleanup.stringbuilder=false
cleanup.stringbuilder_for_local_vars=true
cleanup.substring=false
cleanup.switch=false
cleanup.system_property=false
cleanup.system_property_boolean=false
cleanup.system_property_file_encoding=false
cleanup.system_property_file_separator=false
cleanup.system_property_line_separator=false
cleanup.system_property_path_separator=false
cleanup.ternary_operator=false
cleanup.try_with_resource=false
cleanup.unlooped_while=false
cleanup.unreachable_block=false
cleanup.use_anonymous_class_creation=false
cleanup.use_autoboxing=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_directly_map_method=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_string_is_blank=false
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup.use_unboxing=false
cleanup.use_var=false
cleanup.useless_continue=false
cleanup.useless_return=false
cleanup.valueof_rather_than_instantiation=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_all=false
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.array_with_curly=false
sp_cleanup.arrays_fill=false
sp_cleanup.bitwise_conditional_expression=false
sp_cleanup.boolean_literal=false
sp_cleanup.boolean_value_rather_than_comparison=false
sp_cleanup.break_loop=false
sp_cleanup.collection_cloning=false
sp_cleanup.comparing_on_criteria=false
sp_cleanup.comparison_statement=false
sp_cleanup.controlflow_merge=false
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=false
sp_cleanup.convert_to_enhanced_for_loop_if_loop_var_used=false
sp_cleanup.convert_to_switch_expressions=false
sp_cleanup.correct_indentation=false
sp_cleanup.do_while_rather_than_while=false
sp_cleanup.double_negation=false
sp_cleanup.else_if=false
sp_cleanup.embedded_if=false
sp_cleanup.evaluate_nullable=false
sp_cleanup.extract_increment=false
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.hash=false
sp_cleanup.if_condition=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.instanceof=false
sp_cleanup.instanceof_keyword=false
sp_cleanup.invert_equals=false
sp_cleanup.join=false
sp_cleanup.lazy_logical_operator=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=false
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.map_cloning=false
sp_cleanup.merge_conditional_blocks=false
sp_cleanup.multi_catch=false
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.no_string_creation=false
sp_cleanup.no_super=false
sp_cleanup.number_suffix=false
sp_cleanup.objects_equals=false
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=false
sp_cleanup.operand_factorization=false
sp_cleanup.organize_imports=true
sp_cleanup.overridden_assignment=false
sp_cleanup.plain_replacement=false
sp_cleanup.precompile_regex=false
sp_cleanup.primitive_comparison=false
sp_cleanup.primitive_parsing=false
sp_cleanup.primitive_rather_than_wrapper=false
sp_cleanup.primitive_serialization=false
sp_cleanup.pull_out_if_from_if_else=false
sp_cleanup.pull_up_assignment=false
sp_cleanup.push_down_negation=false
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.reduce_indentation=false
sp_cleanup.redundant_comparator=false
sp_cleanup.redundant_falling_through_block_end=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=false
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_array_creation=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=false
sp_cleanup.remove_unused_imports=false
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.return_expression=false
sp_cleanup.simplify_lambda_expression_and_method_ref=false
sp_cleanup.single_used_field=false
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.standard_comparison=false
sp_cleanup.static_inner_class=false
sp_cleanup.strictly_equal_or_different=false
sp_cleanup.stringbuffer_to_stringbuilder=false
sp_cleanup.stringbuilder=false
sp_cleanup.stringbuilder_for_local_vars=true
sp_cleanup.substring=false
sp_cleanup.switch=false
sp_cleanup.system_property=false
sp_cleanup.system_property_boolean=false
sp_cleanup.system_property_file_encoding=false
sp_cleanup.system_property_file_separator=false
sp_cleanup.system_property_line_separator=false
sp_cleanup.system_property_path_separator=false
sp_cleanup.ternary_operator=false
sp_cleanup.try_with_resource=true
sp_cleanup.unlooped_while=false
sp_cleanup.unreachable_block=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_autoboxing=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_directly_map_method=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_string_is_blank=false
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
sp_cleanup.use_unboxing=false
sp_cleanup.use_var=false
sp_cleanup.useless_continue=false
sp_cleanup.useless_return=false
sp_cleanup.valueof_rather_than_instantiation=false
...@@ -30,11 +30,11 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -30,11 +30,11 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig // build KafkaConsumerConfig
final Map<String, Object> consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = this.buildConsumerConfig();
// Create Pipeline transformations // Create Pipeline transformations
final KafkaActivePowerTimestampReader kafka = final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
final LogKeyValue logKeyValue = new LogKeyValue(); final LogKeyValue logKeyValue = new LogKeyValue();
final MapToGson mapToGson = new MapToGson(); final MapToGson mapToGson = new MapToGson();
......
...@@ -31,7 +31,7 @@ public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumula ...@@ -31,7 +31,7 @@ public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumula
@Override @Override
public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) { public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) {
final StatsAccumulator merged = this.createAccumulator(); final StatsAccumulator merged = createAccumulator();
for (final StatsAccumulator accum : accums) { for (final StatsAccumulator accum : accums) {
merged.addAll(accum.snapshot()); merged.addAll(accum.snapshot());
} }
......
...@@ -14,4 +14,5 @@ public class StatsToString extends SimpleFunction<KV<String, Stats>, KV<String, ...@@ -14,4 +14,5 @@ public class StatsToString extends SimpleFunction<KV<String, Stats>, KV<String,
public KV<String, String> apply(final KV<String, Stats> kv) { public KV<String, String> apply(final KV<String, Stats> kv) {
return KV.of(kv.getKey(), kv.getValue().toString()); return KV.of(kv.getKey(), kv.getValue().toString());
} }
} }
...@@ -10,8 +10,8 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -10,8 +10,8 @@ import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Changes the time format to us europe/paris time. * Changes the time format to us europe/paris time.
*/ */
public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, public class MapTimeFormat
ActivePowerRecord>> { extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> {
private static final long serialVersionUID = -6597391279968647035L; private static final long serialVersionUID = -6597391279968647035L;
private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
private final ZoneId zone = ZoneId.of("Europe/Paris"); private final ZoneId zone = ZoneId.of("Europe/Paris");
...@@ -21,7 +21,7 @@ public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, ...@@ -21,7 +21,7 @@ public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>,
final KV<String, ActivePowerRecord> kv) { final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), return KV.of(this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue()); kv.getValue());
} }
} }
package application; package application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
...@@ -16,8 +18,6 @@ import org.apache.beam.sdk.values.KV; ...@@ -16,8 +18,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
......
package application; package application;
import java.util.Optional; import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy; import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
...@@ -20,7 +19,6 @@ public class AggregatedActivePowerRecordEventTimePolicy ...@@ -20,7 +19,6 @@ public class AggregatedActivePowerRecordEventTimePolicy
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
} }
@Override @Override
public Instant getTimestampForRecord(final PartitionContext ctx, public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, AggregatedActivePowerRecord> record) { final KafkaRecord<String, AggregatedActivePowerRecord> record) {
......
...@@ -8,8 +8,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -8,8 +8,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
/** /**
* Converts AggregatedActivePowerRecord to ActivePowerRecord. * Converts AggregatedActivePowerRecord to ActivePowerRecord.
*/ */
public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, public class AggregatedToActive
KV<String, ActivePowerRecord>> { extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>> {
private static final long serialVersionUID = -8275252527964065889L; private static final long serialVersionUID = -8275252527964065889L;
......
...@@ -5,8 +5,8 @@ import org.apache.beam.sdk.values.KV; ...@@ -5,8 +5,8 @@ import org.apache.beam.sdk.values.KV;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
/** /**
* Filters for {@code Event.SENSOR_REGISTRY_CHANGED} and * Filters for {@code Event.SENSOR_REGISTRY_CHANGED} and {@code Event.SENSOR_REGISTRY_STATUS}
* {@code Event.SENSOR_REGISTRY_STATUS} events. * events.
*/ */
public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> { public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> {
private static final long serialVersionUID = -2233447357614891559L; private static final long serialVersionUID = -2233447357614891559L;
......
...@@ -20,11 +20,11 @@ import titan.ccp.model.sensorregistry.SensorRegistry; ...@@ -20,11 +20,11 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
*/ */
public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<String>>> { public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<String>>> {
private static final long serialVersionUID = 958270648688932091L; private static final long serialVersionUID = 958270648688932091L;
/** /**
* Transforms a parent [children] map of sensors to a child [parents] map. * Transforms a parent [children] map of sensors to a child [parents] map.
*
* @param kv input map. * @param kv input map.
* @param out outputstream. * @param out outputstream.
*/ */
......
...@@ -15,7 +15,6 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -15,7 +15,6 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
public class RecordAggregation public class RecordAggregation
extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> { extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> {
private static final long serialVersionUID = 4362213539553233529L; private static final long serialVersionUID = 4362213539553233529L;
/** /**
......
...@@ -7,8 +7,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -7,8 +7,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
/** /**
* Sets the identifier for new {@link AggregatedActivePowerRecord}. * Sets the identifier for new {@link AggregatedActivePowerRecord}.
*/ */
public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, public class SetIdForAggregated extends
KV<String, AggregatedActivePowerRecord>> { SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>> { // NOCS
private static final long serialVersionUID = 2148522605294086982L; private static final long serialVersionUID = 2148522605294086982L;
@Override @Override
......
...@@ -7,8 +7,8 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -7,8 +7,8 @@ import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Set the Key for a group of {@code ActivePowerRecords} to their Parent. * Set the Key for a group of {@code ActivePowerRecords} to their Parent.
*/ */
public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey, public class SetKeyToGroup
ActivePowerRecord>, KV<String, ActivePowerRecord>> { extends SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>> {
private static final long serialVersionUID = 790215050768527L; private static final long serialVersionUID = 790215050768527L;
......
package application; // NOPMD package application; // NOPMD
import com.google.common.math.StatsAccumulator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -30,7 +31,6 @@ import org.apache.commons.configuration2.Configuration; ...@@ -30,7 +31,6 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration; import org.joda.time.Duration;
import com.google.common.math.StatsAccumulator;
import serialization.AggregatedActivePowerRecordCoder; import serialization.AggregatedActivePowerRecordCoder;
import serialization.AggregatedActivePowerRecordDeserializer; import serialization.AggregatedActivePowerRecordDeserializer;
import serialization.AggregatedActivePowerRecordSerializer; import serialization.AggregatedActivePowerRecordSerializer;
......
...@@ -20,6 +20,7 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str ...@@ -20,6 +20,7 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
/** /**
* Match the changes accordingly. * Match the changes accordingly.
*
* @param kv the sensor parents set that contains the changes. * @param kv the sensor parents set that contains the changes.
*/ */
@ProcessElement @ProcessElement
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment