diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index c75aa62d96da998615831274e2f762c92f50c77b..ba9d0f96e7c3b85ad1be0519f78068a645ae42f9 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -1,5 +1,6 @@ package theodolite.commons.beam; +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline { * * @return the build configuration. */ - public Properties buildConsumerConfig() { - final Properties consumerConfig = new Properties(); + public HashMap buildConsumerConfig() { + final HashMap consumerConfig = new HashMap(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index dc289826352b11c70c5a2858af6d1bcd9f41ff5e..b67b76b742a897c39bb50a82c00e00ffced8c6f7 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,5 +1,6 @@ package application; +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -34,7 +35,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); // build KafkaConsumerConfig - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index fa7d455e3625309843cc055a39721873f8d51d8f..2971390c627b8c8765cee486860427c335acf7f2 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -2,6 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; + +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -48,7 +50,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index 504a6f286f26a96bfc403783b5183eb2949b1e79..6964d23d84fcc1d0f836842d8d4c146352b10e90 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -2,6 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; + +import java.util.HashMap; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -56,7 +58,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final Duration triggerDelay = Duration.standardSeconds(triggerInterval); // Build kafka configuration - final Properties consumerConfig = buildConsumerConfig(); + final HashMap consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry();