From f3493a1fe260bd563e8629e0144412593cddf51a Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 25 Nov 2021 19:16:27 +0100
Subject: [PATCH] Change KafkaConsumer from HashMap to Map

refactor uc4-beam a bit
---
 .../theodolite/commons/beam/AbstractPipeline.java   | 11 ++++++-----
 .../beam/kafka/KafkaActivePowerRecordReader.java    |  6 ++----
 .../beam/kafka/KafkaActivePowerTimestampReader.java |  6 ++----
 .../src/main/java/application/Uc1BeamPipeline.java  |  3 ++-
 .../src/main/java/application/Uc2BeamPipeline.java  |  3 ++-
 .../src/main/java/application/Uc3BeamPipeline.java  |  3 ++-
 .../src/main/java/application/Uc4BeamPipeline.java  | 13 +++++++------
 7 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
index 56e1239f6..03f11cc74 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
@@ -1,6 +1,8 @@
 package theodolite.commons.beam;
 
 import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.commons.configuration2.Configuration;
@@ -11,11 +13,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
  */
 public class AbstractPipeline extends Pipeline {
 
-  // Application Configurations
-  private final Configuration config;
-
   protected final String inputTopic;
   protected final String bootstrapServer;
+  // Application Configurations
+  private final Configuration config;
 
   protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
     super(options);
@@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline {
    *
    * @return the build configuration.
    */
-  public HashMap<String, Object> buildConsumerConfig() {
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
+  public Map<String, Object> buildConsumerConfig() {
+    final Map<String, Object> consumerConfig = new HashMap<>();
     consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
     consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
index 56f5a6810..07b52be5d 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
@@ -1,9 +1,7 @@
 package theodolite.commons.beam.kafka;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-
-import java.util.HashMap;
-
+import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -28,7 +26,7 @@ public class KafkaActivePowerRecordReader extends
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
-                                      final HashMap consumerConfig) {
+                                      final Map consumerConfig) {
     super();
 
     // Check if boostrap server and inputTopic are defined
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
index 0b22745bf..fdb15865d 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
@@ -1,6 +1,7 @@
 package theodolite.commons.beam.kafka;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -10,9 +11,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import titan.ccp.model.records.ActivePowerRecord;
 
-import java.util.HashMap;
-import java.util.Properties;
-
 /**
  * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
  * Has additional a TimestampPolicy.
@@ -29,7 +27,7 @@ public class KafkaActivePowerTimestampReader extends
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic,
-                                         final HashMap consumerConfig) {
+                                         final Map consumerConfig) {
     super();
 
     // Check if boostrap server and inputTopic are defined
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index b67b76b74..992eda161 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -1,6 +1,7 @@
 package application;
 
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -35,7 +36,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
     cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
     // build KafkaConsumerConfig
-    final HashMap consumerConfig = buildConsumerConfig();
+    final Map consumerConfig = buildConsumerConfig();
 
     // Create Pipeline transformations
     final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
index 2971390c6..a0ba5cb08 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
@@ -4,6 +4,7 @@ import com.google.common.math.Stats;
 import com.google.common.math.StatsAccumulator;
 
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -50,7 +51,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
     final Duration duration = Duration.standardMinutes(windowDurationMinutes);
 
     // Build kafka configuration
-    final HashMap consumerConfig = buildConsumerConfig();
+    final Map consumerConfig = buildConsumerConfig();
 
     // Set Coders for Classes that will be distributed
     final CoderRegistry cr = this.getCoderRegistry();
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
index 6964d23d8..088664aef 100644
--- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
@@ -4,6 +4,7 @@ import com.google.common.math.Stats;
 import com.google.common.math.StatsAccumulator;
 
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -58,7 +59,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
     final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
 
     // Build kafka configuration
-    final HashMap consumerConfig = buildConsumerConfig();
+    final Map consumerConfig = buildConsumerConfig();
 
     // Set Coders for Classes that will be distributed
     final CoderRegistry cr = this.getCoderRegistry();
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
index cf3eb22d4..a8523220d 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
@@ -29,7 +29,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
 /**
  * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
  * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
- * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
+ * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST address
  * using--flinkMaster as run parameter. To persist logs add
  * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
  * Input Output in Common in the Run Configuration Start via Eclipse Run.
@@ -57,8 +57,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
     final Duration gracePeriod = Duration.standardSeconds(grace);
 
     // Build kafka configuration
-    final HashMap<String, Object> consumerConfig = buildConsumerConfig();
-    final HashMap<String, Object> configurationConfig = configurationConfig(config);
+    final Map<String, Object> consumerConfig = buildConsumerConfig();
+    final Map<String, Object> configurationConfig = configurationConfig(config);
 
     // Set Coders for Classes that will be distributed
     final CoderRegistry cr = this.getCoderRegistry();
@@ -106,7 +106,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
                 (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
                     previousWaterMark))
             .withoutMetadata())
-        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
+        .apply("Apply Windows", Window.into(FixedWindows.of(duration)))
         // Convert into the correct data format
         .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
             MapElements.via(aggregatedToActive))
@@ -162,13 +162,14 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
                 .accumulatingFiredPanes())
             .apply(View.asMap());
 
-    FilterNullValues filterNullValues = new FilterNullValues();
+    final FilterNullValues filterNullValues = new FilterNullValues();
 
     // Build pairs of every sensor reading and parent
     final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
         inputCollection.apply(
                 "Duplicate as flatMap",
-                ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
+                ParDo.of(new DuplicateAsFlatMap(childParentPairMap))
+                    .withSideInputs(childParentPairMap))
             .apply("Filter only latest changes", Latest.perKey())
             .apply("Filter out null values",
                 Filter.by(filterNullValues));
-- 
GitLab