From dede63aad12c11b6cffd32fd58bc2e177da3f85c Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Thu, 11 Nov 2021 15:51:11 +0100 Subject: [PATCH] Small code quality fixes --- .../theodolite/commons/beam/AbstractBeamService.java | 12 +++--------- .../beam/kafka/KafkaAggregatedPowerRecordReader.java | 10 +++++----- .../main/java/application/Uc1ApplicationBeam.java | 7 ++----- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index b91f47cfa..debfd967e 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -20,7 +20,7 @@ public class AbstractBeamService { // Beam Pipeline protected PipelineOptions options; - public AbstractBeamService(String[] args) { + public AbstractBeamService(final String[] args) { //NOPMD options = PipelineOptionsFactory.fromArgs(args).create(); options.setJobName(APPLICATION_NAME); } @@ -29,13 +29,7 @@ public class AbstractBeamService { /** * Abstract main for a Beam Service. */ - public static void main(final String[] args) { - AbstractBeamService service = new AbstractBeamService(args); - service.run(); - } - - public void run() { - } + public static void main(final String[] args){} //NOPMD /** * Builds a simple configuration for a Kafka consumer. @@ -43,7 +37,7 @@ public class AbstractBeamService { * @return the build Kafka consumer configuration. */ public Properties buildConsumerConfig() { - Properties consumerConfig = new Properties(); + final Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java index 5976838d9..c91987804 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java @@ -1,9 +1,7 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Map; import java.util.Properties; - import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -19,15 +17,17 @@ import titan.ccp.model.records.ActivePowerRecord; public class KafkaAggregatedPowerRecordReader extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { + private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; + /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ @SuppressWarnings({"unchecked", "rawtypes"}) - public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, - Properties consumerConfig) { + public KafkaAggregatedPowerRecordReader(final String bootstrapServer, final String inputTopic, + final Properties consumerConfig) { super(); // Check if boostrap server and inputTopic are defined @@ -47,7 +47,7 @@ public class KafkaAggregatedPowerRecordReader extends } @Override - public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) { + public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) { return input.apply(this.reader); } diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java index b4e1a4cef..e8707fcdf 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java @@ -1,13 +1,11 @@ package application; import com.google.gson.Gson; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Properties; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -16,7 +14,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.beam.AbstractBeamService; @@ -24,6 +21,7 @@ import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader; import titan.ccp.model.records.ActivePowerRecord; + /** * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload @@ -50,9 +48,8 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { /** * Main method. - * */ - @SuppressWarnings({"unchecked", "rawtypes","unused"}) + @SuppressWarnings({"unchecked", "rawtypes", "unused"}) public static void main(final String[] args) { final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args); -- GitLab