From 704eb816cb6998a9f559ceb8d3c733598f49d914 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Fri, 18 Mar 2022 20:03:44 +0100 Subject: [PATCH] Align key names for Kafka configs + consistency for property oder + spellings --- .../Uc1KafkaPropertiesBuilder.java | 6 ++-- .../Uc2KafkaPropertiesBuilder.java | 21 +++++++---- .../Uc3KafkaPropertiesBuilder.java | 12 ++++--- .../Uc4KafkaPropertiesBuilder.java | 35 ++++++++++--------- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java index b9a6b1d0d..e753afd99 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java @@ -1,6 +1,8 @@ package rocks.theodolite.benchmarks.uc1.hazelcastjet; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -38,13 +40,13 @@ public class Uc1KafkaPropertiesBuilder { // > setProperties not applicable for non string values final Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty("schema.registry.url", schemaRegistryUrl); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty("specific.avro.reader", TRUE); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); return props; diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java index 78a4ef5b0..8c61a0339 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java @@ -1,9 +1,12 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; @@ -38,14 +41,14 @@ public class Uc2KafkaPropertiesBuilder { // > Could not find constant fields for all properties // > setProperties not applicable for non string values final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty("specific.avro.reader", TRUE); - props.setProperty("schema.registry.url", schemaRegistryUrl); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @@ -63,9 +66,13 @@ public class Uc2KafkaPropertiesBuilder { kafkaBootstrapServerDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); // NOCS - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - props.put("value.serializer", StringSerializer.class.getCanonicalName()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + props.setProperty("specific.avro.writer", TRUE); + return props; } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java index 67cb917e2..be1e29e24 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java @@ -1,6 +1,8 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -36,13 +38,13 @@ public class Uc3KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty("specific.avro.reader", TRUE); - props.setProperty("schema.registry.url", schemaRegistryUrl); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @@ -61,11 +63,13 @@ public class Uc3KafkaPropertiesBuilder { kafkaBootstrapServerDefault); final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + props.setProperty("specific.avro.writer", TRUE); + return props; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java index eb0c250b6..f655b13ed 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java @@ -1,6 +1,8 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Objects; import java.util.Properties; @@ -17,14 +19,13 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializ */ public class Uc4KafkaPropertiesBuilder { - private static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader"; - private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; private static final String TRUE = "true"; + private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. + * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * Pipeline. @@ -40,21 +41,21 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); return props; } /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. + * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * Pipeline. @@ -70,13 +71,13 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); return props; } @@ -84,7 +85,7 @@ public class Uc4KafkaPropertiesBuilder { /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. + * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * Pipeline. @@ -105,9 +106,9 @@ public class Uc4KafkaPropertiesBuilder { EventDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); - props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); return props; } @@ -129,13 +130,13 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getCanonicalName()); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty("specific.avro.writer", TRUE); - props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); return props; } -- GitLab