Skip to content
Snippets Groups Projects
Commit 704eb816 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Align key names for Kafka configs

+ consistency for property oder
+ spellings
parent 3fb0bd1f
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7150 passed
package rocks.theodolite.benchmarks.uc1.hazelcastjet; package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
...@@ -38,13 +40,13 @@ public class Uc1KafkaPropertiesBuilder { ...@@ -38,13 +40,13 @@ public class Uc1KafkaPropertiesBuilder {
// > setProperties not applicable for non string values // > setProperties not applicable for non string values
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty("schema.registry.url", schemaRegistryUrl); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty("specific.avro.reader", TRUE); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
return props; return props;
......
package rocks.theodolite.benchmarks.uc2.hazelcastjet; package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
...@@ -38,14 +41,14 @@ public class Uc2KafkaPropertiesBuilder { ...@@ -38,14 +41,14 @@ public class Uc2KafkaPropertiesBuilder {
// > Could not find constant fields for all properties // > Could not find constant fields for all properties
// > setProperties not applicable for non string values // > setProperties not applicable for non string values
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty("specific.avro.reader", TRUE); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty("schema.registry.url", schemaRegistryUrl); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props; return props;
} }
...@@ -63,9 +66,13 @@ public class Uc2KafkaPropertiesBuilder { ...@@ -63,9 +66,13 @@ public class Uc2KafkaPropertiesBuilder {
kafkaBootstrapServerDefault); kafkaBootstrapServerDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServers); // NOCS props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put("key.serializer", StringSerializer.class.getCanonicalName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
props.put("value.serializer", StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.setProperty("specific.avro.writer", TRUE);
return props; return props;
} }
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
...@@ -36,13 +38,13 @@ public class Uc3KafkaPropertiesBuilder { ...@@ -36,13 +38,13 @@ public class Uc3KafkaPropertiesBuilder {
schemaRegistryUrlDefault); schemaRegistryUrlDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty("specific.avro.reader", TRUE); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty("schema.registry.url", schemaRegistryUrl); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props; return props;
} }
...@@ -61,11 +63,13 @@ public class Uc3KafkaPropertiesBuilder { ...@@ -61,11 +63,13 @@ public class Uc3KafkaPropertiesBuilder {
kafkaBootstrapServerDefault); kafkaBootstrapServerDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
props.setProperty("specific.avro.writer", TRUE);
return props; return props;
} }
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet; package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
...@@ -17,14 +19,13 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializ ...@@ -17,14 +19,13 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializ
*/ */
public class Uc4KafkaPropertiesBuilder { public class Uc4KafkaPropertiesBuilder {
private static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader";
private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static final String TRUE = "true"; private static final String TRUE = "true";
private static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
/** /**
* Builds Kafka Properties used for the UC4 Benchmark pipeline. * Builds Kafka Properties used for the UC4 Benchmark pipeline.
* *
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline. * Pipeline.
...@@ -40,21 +41,21 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -40,21 +41,21 @@ public class Uc4KafkaPropertiesBuilder {
schemaRegistryUrlDefault); schemaRegistryUrlDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
return props; return props;
} }
/** /**
* Builds Kafka Properties used for the UC4 Benchmark pipeline. * Builds Kafka Properties used for the UC4 Benchmark pipeline.
* *
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline. * Pipeline.
...@@ -70,13 +71,13 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -70,13 +71,13 @@ public class Uc4KafkaPropertiesBuilder {
schemaRegistryUrlDefault); schemaRegistryUrlDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
return props; return props;
} }
...@@ -84,7 +85,7 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -84,7 +85,7 @@ public class Uc4KafkaPropertiesBuilder {
/** /**
* Builds Kafka Properties used for the UC4 Benchmark pipeline. * Builds Kafka Properties used for the UC4 Benchmark pipeline.
* *
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline. * Pipeline.
...@@ -105,9 +106,9 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -105,9 +106,9 @@ public class Uc4KafkaPropertiesBuilder {
EventDeserializer.class.getCanonicalName()); EventDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(SPECIFIC_AVRO_READER_CONFIG, TRUE); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
return props; return props;
} }
...@@ -129,13 +130,13 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -129,13 +130,13 @@ public class Uc4KafkaPropertiesBuilder {
schemaRegistryUrlDefault); schemaRegistryUrlDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getCanonicalName()); KafkaAvroSerializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty("specific.avro.writer", TRUE); props.setProperty("specific.avro.writer", TRUE);
props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return props; return props;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment