From 2a2958171debd99e6c7ca75ff8c055d29f368516 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Thu, 21 Oct 2021 14:52:49 +0200 Subject: [PATCH] Refactor uc1-beam for checkstyle, pmd, spotbugs Co-authored-by: Jan Bensien <stu128012@mail.uni-kiel.de> --- .../java/application/Uc1ApplicationBeam.java | 73 +++++++++++++------ .../java/application/Uc1ApplicationBeam.java | 11 +-- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java index f38c42591..081cbaedf 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java @@ -20,6 +20,8 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.model.records.ActivePowerRecord; /** @@ -30,25 +32,46 @@ import titan.ccp.model.records.ActivePowerRecord; * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public class Uc1ApplicationBeam { - @SuppressWarnings({"unchecked", "rawtypes", "serial"}) +public final class Uc1ApplicationBeam { + private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class); + private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS"; + private static final String INPUT = "INPUT"; + private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL"; + private static final String YES = "true"; + private static final String USE_AVRO_READER = YES; + private static final String AUTO_COMMIT_CONFIG = YES; + + /** + * Private constructor to avoid instantiation. + */ + private Uc1ApplicationBeam() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + */ + @SuppressWarnings({"unchecked", "rawtypes","unused"}) public static void main(final String[] args) { + // Set Configuration for Kafka final String bootstrapServer = - System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS") - : "my-confluent-cp-kafka:9092"; - final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input"; - final String schemaRegistryURL = - System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL") - : "http://my-confluent-cp-schema-registry:8081"; + System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092" + : System.getenv(BOOTSTRAP); + final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT); + final String schemaRegistryUrl = + System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081" + : System.getenv(SCHEMA_REGISTRY); // Set consumer configuration for the schema registry and commits back to Kafka final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put("schema.registry.url", schemaRegistryURL); - consumerConfig.put("specific.avro.reader", "true"); + consumerConfig.put("schema.registry.url", schemaRegistryUrl); + consumerConfig.put("specific.avro.reader", USE_AVRO_READER); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application"); + final LogKeyValue logKeyValue = new LogKeyValue(); final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); options.setJobName("ucapplication"); @@ -70,6 +93,7 @@ public class Uc1ApplicationBeam { AvroCoder.of(ActivePowerRecord.class)) .withConsumerConfigUpdates(consumerConfig) .withoutMetadata(); + // Apply pipeline transformations // Read from Kafka pipeline.apply(kafka) @@ -77,7 +101,7 @@ public class Uc1ApplicationBeam { .apply(MapElements .via( new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() { - transient Gson gsonObj = new Gson(); + private transient Gson gsonObj = new Gson(); @Override public KV<String, String> apply( @@ -90,18 +114,25 @@ public class Uc1ApplicationBeam { } })) // Print to console - .apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { - @ProcessElement - public void processElement(@Element final KV<String, String> kv, - final OutputReceiver<KV<String, String>> out) { - System.out.println("Key: " + kv.getKey() + "Value: " + kv.getValue()); -// out.output(kv); - } - })); - + .apply(ParDo.of(logKeyValue)); // Submit job and start execution pipeline.run().waitUntilFinish(); + } + + /** + * Logs all Key Value pairs. + */ + @SuppressWarnings({"unused"}) + private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> { + private static final long serialVersionUID = 4328743; + @ProcessElement + public void processElement(@Element final KV<String, String> kv, + final OutputReceiver<KV<String, String>> out) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue()); + } + } } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java index 3b52b18d2..7434719a3 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1ApplicationBeam.java @@ -43,8 +43,6 @@ public final class Uc1ApplicationBeam { private static final String USE_AVRO_READER = YES; private static final String AUTO_COMMIT_CONFIG = YES; - - /** * Private constructor to avoid instantiation. */ @@ -81,7 +79,7 @@ public final class Uc1ApplicationBeam { // --samzaExecutionEnvironment=STANDALONE // --maxSourceParallelism=1024 - final LoggKeys logging = new LoggKeys(); + final LogKeyValue logKeyValue = new LogKeyValue(); final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); options.setJobName("ucapplication"); @@ -124,16 +122,16 @@ public final class Uc1ApplicationBeam { } })) // Print to console - .apply(ParDo.of(logging)); + .apply(ParDo.of(logKeyValue)); // Start execution pipeline.run().waitUntilFinish(); } /** - * Logs all Keys it reads. + * Logs all Logs all Key Value pairs.. */ @SuppressWarnings({"unused"}) - private static class LoggKeys extends DoFn<KV<String, String>,KV<String, String>> { + private static class LogKeyValue extends DoFn<KV<String, String>,KV<String, String>> { private static final long serialVersionUID = 4328743; @ProcessElement @@ -144,7 +142,6 @@ public final class Uc1ApplicationBeam { } } } - } -- GitLab