From 1661623c36d17c4979a77add7b6ad7229b6008c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 19 Feb 2022 19:58:06 +0100 Subject: [PATCH] Minor code refactroing --- .../commons/beam/AbstractBeamService.java | 18 ++++++++--------- .../beam/kafka/KafkaGenericReader.java | 20 +++++++++---------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index 03c5ca1da..3e94fb4c8 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** - * Abstraction of a Beam microservice. - * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. + * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and + * the beam Runner. */ public class AbstractBeamService { @@ -20,26 +20,24 @@ public class AbstractBeamService { // Application Configurations private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = - config.getString(ConfigurationKeys.APPLICATION_NAME); - + private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); /** * Creates AbstractBeamService with options. */ - public AbstractBeamService(final String[] args) { //NOPMD + public AbstractBeamService(final String[] args) { // NOPMD super(); LOGGER.info("Pipeline options:"); for (final String s : args) { LOGGER.info("{}", s); } - options = PipelineOptionsFactory.fromArgs(args).create(); - options.setJobName(applicationName); - LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); + this.options = PipelineOptionsFactory.fromArgs(args).create(); + this.options.setJobName(this.applicationName); + LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); } public Configuration getConfig() { - return config; + return this.config; } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java index 83336b5a4..e513c3a0e 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.Deserializer; /** * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. @@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; * @param <K> Type of the Key. * @param <V> Type of the Value. */ -public class KafkaGenericReader<K, V> extends - PTransform<PBegin, PCollection<KV<K, V>>> { +public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<K, V>>> reader; @@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - public KafkaGenericReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<K>> - keyDeserializer, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<V>> - valueDeserializer) { + public KafkaGenericReader( + final String bootstrapServer, + final String inputTopic, + final Map<String, Object> consumerConfig, + final Class<? extends Deserializer<K>> keyDeserializer, + final Class<? extends Deserializer<V>> valueDeserializer) { super(); // Check if boostrap server and inputTopic are defined @@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - reader = + this.reader = KafkaIO.<K, V>read() .withBootstrapServers(bootstrapServer) .withTopic(inputTopic) -- GitLab