Skip to content
Snippets Groups Projects
Commit 2e87cb47 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change KafkaConfig from Properties to HashMap

parent 981e72e3
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam; package theodolite.commons.beam;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
...@@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline { ...@@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline {
* *
* @return the build configuration. * @return the build configuration.
*/ */
public Properties buildConsumerConfig() { public HashMap buildConsumerConfig() {
final Properties consumerConfig = new Properties(); final HashMap consumerConfig = new HashMap();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
......
package application; package application;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
...@@ -34,7 +35,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -34,7 +35,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig // build KafkaConsumerConfig
final Properties consumerConfig = buildConsumerConfig(); final HashMap consumerConfig = buildConsumerConfig();
// Create Pipeline transformations // Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
......
...@@ -2,6 +2,8 @@ package application; ...@@ -2,6 +2,8 @@ package application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
...@@ -48,7 +50,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -48,7 +50,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
final Duration duration = Duration.standardMinutes(windowDurationMinutes); final Duration duration = Duration.standardMinutes(windowDurationMinutes);
// Build kafka configuration // Build kafka configuration
final Properties consumerConfig = buildConsumerConfig(); final HashMap consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = this.getCoderRegistry();
......
...@@ -2,6 +2,8 @@ package application; ...@@ -2,6 +2,8 @@ package application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
...@@ -56,7 +58,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { ...@@ -56,7 +58,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
final Duration triggerDelay = Duration.standardSeconds(triggerInterval); final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
// Build kafka configuration // Build kafka configuration
final Properties consumerConfig = buildConsumerConfig(); final HashMap consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = this.getCoderRegistry();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment