From 408d1e5d6abdf5ce4c7b88e51846af1bc6d0d6b6 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Tue, 29 Mar 2022 17:19:18 +0200 Subject: [PATCH] Delete depreciated ucXKafkaProperties classes + fix uc4 HazelcastFactory --- .../Uc1KafkaPropertiesBuilder.java | 55 ------- .../Uc2KafkaPropertiesBuilder.java | 79 ---------- .../Uc3KafkaPropertiesBuilder.java | 76 --------- .../hazelcastjet/Uc4HazelcastJetFactory.java | 12 +- .../Uc4KafkaPropertiesBuilder.java | 144 ------------------ 5 files changed, 8 insertions(+), 358 deletions(-) delete mode 100644 theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java delete mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java delete mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java deleted file mode 100644 index c5d38807d..000000000 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java +++ /dev/null @@ -1,55 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a Properties object containing the needed kafka properties used for the UC1 benchmark of - * Hazelcast Jet. - */ -public class Uc1KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC1 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not net by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC1 - * Pipeline. - */ - public Properties buildKafkaPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - // comment: - // > Could not find constant fields for all properties - // > setProperties not applicable for non string values - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - - - return props; - } - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java deleted file mode 100644 index 8c61a0339..000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java +++ /dev/null @@ -1,79 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC2 benchmark of Hazelcast Jet. - */ -public class Uc2KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC2 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2 - * Pipeline. - */ - public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - // comment: - // > Could not find constant fields for all properties - // > setProperties not applicable for non string values - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; - } - - /** - * Builds Kafka Properties used for the UC2 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - - final Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java deleted file mode 100644 index be1e29e24..000000000 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC3 benchmark of Hazelcast Jet. - */ -public class Uc3KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC3 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3 - * Pipeline. - */ - public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; - } - - /** - * Builds Kafka Properties used for the UC3 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - - final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java index a352ec6ca..9b6aa7126 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java @@ -4,6 +4,7 @@ import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Objects; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; @@ -205,8 +206,8 @@ public class Uc4HazelcastJetFactory { final Properties kafkaInputReadProps = propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault, jobName, - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); final Properties kafkaConfigReadProps = propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, @@ -238,9 +239,12 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); + propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + StringSerializer.class.getCanonicalName(), + KafkaAvroSerializer.class.getCanonicalName()); this.kafkaWritePropsForPipeline = kafkaWriteProps; return this; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java deleted file mode 100644 index f655b13ed..000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java +++ /dev/null @@ -1,144 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC4 benchmark of Hazelcast Jet. - */ -public class Uc4KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaInputReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaAggregationReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaConfigReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - EventDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - KafkaAvroSerializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -} -- GitLab