From 701d0a3a8936d003249a72cb5618af809f761889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 26 Nov 2022 18:10:27 +0100 Subject: [PATCH] Format Hazelcast Jet code --- ...ggregatedActivePowerRecordAccumulator.java | 1 - .../uc4/hazelcastjet/HashMapSupplier.java | 2 - .../uc4/hazelcastjet/HistoryService.java | 19 ++-- .../uc4/hazelcastjet/Uc4PipelineFactory.java | 86 +++++++++---------- 4 files changed, 49 insertions(+), 59 deletions(-) diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/AggregatedActivePowerRecordAccumulator.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/AggregatedActivePowerRecordAccumulator.java index 9e25b4df7..567986ce2 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/AggregatedActivePowerRecordAccumulator.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/AggregatedActivePowerRecordAccumulator.java @@ -20,7 +20,6 @@ public class AggregatedActivePowerRecordAccumulator { // This constructor is intentionally empty. Nothing special is needed here. } - /** * Creates an AggregationObject. */ diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java index ebf5e4647..61910850b 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java @@ -21,6 +21,4 @@ public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> return this.get(); } - - } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 36ab0995e..f2b5cb41c 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -21,8 +21,8 @@ public class HistoryService extends HazelcastJetService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); /** - * Constructs the use case logic for UC4. - * Retrieves the needed values and instantiates a pipeline factory. + * Constructs the use case logic for UC4. Retrieves the needed values and instantiates a pipeline + * factory. */ public HistoryService() { super(LOGGER); @@ -32,12 +32,12 @@ public class HistoryService extends HazelcastJetService { KafkaAvroDeserializer.class.getCanonicalName()); final Properties kafkaConfigReadProps = - propsBuilder.buildReadProperties( + this.propsBuilder.buildReadProperties( EventDeserializer.class.getCanonicalName(), StringDeserializer.class.getCanonicalName()); final Properties kafkaAggregationReadProps = - propsBuilder.buildReadProperties( + this.propsBuilder.buildReadProperties( StringDeserializer.class.getCanonicalName(), KafkaAvroDeserializer.class.getCanonicalName()); @@ -47,27 +47,26 @@ public class HistoryService extends HazelcastJetService { KafkaAvroSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final String kafkaConfigurationTopic = - config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); + this.config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); final String kafkaFeedbackTopic = - config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); + this.config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); final int windowSize = Integer.parseInt( - config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); + this.config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, kafkaConfigReadProps, kafkaAggregationReadProps, kafkaWriteProps, - kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, + this.kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, windowSize); } - @Override protected void registerSerializer() { this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class) diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index 568e095e3..abac638ec 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -30,8 +30,7 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; /** - * PipelineFactory for use case 4. - * Allows to build and extend pipelines. + * PipelineFactory for use case 4. Allows to build and extend pipelines. */ public class Uc4PipelineFactory extends PipelineFactory { @@ -64,17 +63,17 @@ public class Uc4PipelineFactory extends PipelineFactory { * @param windowSize The window size of the tumbling window used in this pipeline. */ public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD - final Properties kafkaConfigPropsForPipeline, - final Properties kafkaFeedbackPropsForPipeline, - final Properties kafkaWritePropsForPipeline, - final String kafkaInputTopic, - final String kafkaOutputTopic, - final String kafkaConfigurationTopic, - final String kafkaFeedbackTopic, - final int windowSize) { + final Properties kafkaConfigPropsForPipeline, + final Properties kafkaFeedbackPropsForPipeline, + final Properties kafkaWritePropsForPipeline, + final String kafkaInputTopic, + final String kafkaOutputTopic, + final String kafkaConfigurationTopic, + final String kafkaFeedbackTopic, + final int windowSize) { super(kafkaInputReadPropsForPipeline, kafkaInputTopic, - kafkaWritePropsForPipeline,kafkaOutputTopic); + kafkaWritePropsForPipeline, kafkaOutputTopic); this.kafkaConfigPropsForPipeline = kafkaConfigPropsForPipeline; this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaConfigurationTopic = kafkaConfigurationTopic; @@ -84,21 +83,21 @@ public class Uc4PipelineFactory extends PipelineFactory { /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. - * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data - * for UC4. + * + * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data for UC4. */ @Override public Pipeline buildPipeline() { // Sources for this use case final StreamSource<Entry<Event, String>> configSource = - KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic); + KafkaSources.kafka(this.kafkaConfigPropsForPipeline, this.kafkaConfigurationTopic); final StreamSource<Entry<String, ActivePowerRecord>> inputSource = - KafkaSources.kafka(kafkaReadPropsForPipeline, kafkaInputTopic); + KafkaSources.kafka(this.kafkaReadPropsForPipeline, this.kafkaInputTopic); final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource = - KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); + KafkaSources.kafka(this.kafkaFeedbackPropsForPipeline, this.kafkaFeedbackTopic); // Extend UC4 topology to pipeline final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation = @@ -106,17 +105,17 @@ public class Uc4PipelineFactory extends PipelineFactory { // Add Sink2: Write back to kafka feedback/aggregation topic uc4Aggregation.writeTo(KafkaSinks.kafka( - kafkaWritePropsForPipeline, kafkaFeedbackTopic)); + this.kafkaWritePropsForPipeline, this.kafkaFeedbackTopic)); // Log aggregation product uc4Aggregation.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka output topic uc4Aggregation.writeTo(KafkaSinks.kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); + this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); // Return the pipeline - return pipe; + return this.pipe; } @@ -147,16 +146,14 @@ public class Uc4PipelineFactory extends PipelineFactory { * according aggregated values. The data can be further modified or directly be linked to * a Hazelcast Jet sink. */ - public StreamStage // NOPMD - <Map.Entry<String, AggregatedActivePowerRecord>> - extendUc4Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, - final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>> - aggregationSource, - final StreamSource<Map.Entry<Event, String>> configurationSource) { + public StreamStage<Map.Entry<String, AggregatedActivePowerRecord>> extendUc4Topology(// NOPMD + final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource, + final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>> aggregationSource, + final StreamSource<Map.Entry<Event, String>> configurationSource) { ////////////////////////////////// // (1) Configuration Stream - pipe.readFrom(configurationSource) + this.pipe.readFrom(configurationSource) .withNativeTimestamps(0) .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) @@ -169,13 +166,13 @@ public class Uc4PipelineFactory extends PipelineFactory { ////////////////////////////////// // (1) Sensor Input Stream - final StreamStage<Entry<String, ActivePowerRecord>> inputStream = pipe + final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe .readFrom(inputSource) .withNativeTimestamps(0); ////////////////////////////////// // (1) Aggregation Stream - final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe + final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe .readFrom(aggregationSource) .withNativeTimestamps(0) .map(entry -> { // Map Aggregated to ActivePowerRecord @@ -187,10 +184,10 @@ public class Uc4PipelineFactory extends PipelineFactory { ////////////////////////////////// // (2) UC4 Merge Input with aggregation stream - final StreamStageWithKey<Entry<String, ActivePowerRecord>, String> - mergedInputAndAggregations = inputStream - .merge(aggregations) - .groupingKey(Entry::getKey); + final StreamStageWithKey<Entry<String, ActivePowerRecord>, String> mergedInputAndAggregations = + inputStream + .merge(aggregations) + .groupingKey(Entry::getKey); ////////////////////////////////// // (3) UC4 Join Configuration and Merges Input/Aggregation Stream @@ -232,28 +229,25 @@ public class Uc4PipelineFactory extends PipelineFactory { ////////////////////////////////// // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time - final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> - windowedLastValues = dupliAsFlatmappedStage - .window(WindowDefinition.tumbling(windowSize)); + final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues = + dupliAsFlatmappedStage + .window(WindowDefinition.tumbling(this.windowSize)); - final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, - AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = + final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS AggregateOperation .withCreate(AggregatedActivePowerRecordAccumulator::new) .<Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> { acc.setId(rec.getKey().getGroup()); acc.addInputs(rec.getValue()); }) - .andCombine((acc, acc2) -> - acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp())) + .andCombine((acc, acc2) -> acc.addInputs(acc2.getId(), acc2.getSumInW(), + acc2.getCount(), acc.getTimestamp())) .andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount())) - .andExportFinish(acc -> - new AggregatedActivePowerRecord(acc.getId(), - acc.getTimestamp(), - acc.getCount(), - acc.getSumInW(), - acc.getAverageInW()) - ); + .andExportFinish(acc -> new AggregatedActivePowerRecord(acc.getId(), + acc.getTimestamp(), + acc.getCount(), + acc.getSumInW(), + acc.getAverageInW())); // write aggregation back to kafka -- GitLab