From 2e87cb4721ced20b5797f1b0edf9e8902abf6db5 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Wed, 24 Nov 2021 18:38:03 +0100 Subject: [PATCH] Change KafkaConfig from Properties to HashMap --- .../main/java/theodolite/commons/beam/AbstractPipeline.java | 5 +++-- .../uc1-beam/src/main/java/application/Uc1BeamPipeline.java | 3 ++- .../uc2-beam/src/main/java/application/Uc2BeamPipeline.java | 4 +++- .../uc3-beam/src/main/java/application/Uc3BeamPipeline.java | 4 +++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index c75aa62d9..ba9d0f96e 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -1,5 +1,6 @@ package theodolite.commons.beam; +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline { * * @return the build configuration. */ - public Properties buildConsumerConfig() { - final Properties consumerConfig = new Properties(); + public HashMap buildConsumerConfig() { + final HashMap consumerConfig = new HashMap(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index dc2898263..b67b76b74 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,5 +1,6 @@ package application; +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -34,7 +35,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); // build KafkaConsumerConfig - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index fa7d455e3..2971390c6 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -2,6 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; + +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -48,7 +50,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index 504a6f286..6964d23d8 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -2,6 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; + +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -56,7 +58,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final Duration triggerDelay = Duration.standardSeconds(triggerInterval); // Build kafka configuration - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); -- GitLab