Skip to content
Snippets Groups Projects
Commit e473b475 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Extend KafkaPropertiesBuilder commons-hazelcastjet

parent 72cd1119
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
......@@ -11,13 +11,81 @@ import org.apache.kafka.clients.producer.ProducerConfig;
/**
* Generalized builder for Kafka properties.
* Will always set AUTO_OFFSET_RESET_CONFIG to earliest
*
*/
public class KafkaPropertiesBuilder {
private static final String TRUE = "true";
private static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
private final String specificAvroWriter = "specific.avro.writer";
private Properties readProperties;
private Properties writeProperties;
public KafkaPropertiesBuilder() {
}
/**
* Constructs a new PropertiesBuilder with defined default read and write properties.
* @param kafkaBootstrapServer default boostrap address property.
* @param schemaRegistryUrl default schema registry address property.
* @param jobName default job name property.
*/
public KafkaPropertiesBuilder(final String kafkaBootstrapServer,
final String schemaRegistryUrl,
final String jobName) {
this.writeProperties = new Properties();
this.readProperties = new Properties();
readProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
readProperties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl);
writeProperties.putAll(readProperties);
readProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, jobName);
readProperties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
readProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
readProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, TRUE);
writeProperties.setProperty(specificAvroWriter, TRUE);
}
/**
* Returns default read properties with the defined deserializers.
* @param keyDeserializer deserializer for the key.
* @param valueDeserializer deserializer for the value.
*/
public Properties buildReadProperties(final String keyDeserializer,
final String valueDeserializer) {
final Properties props = new Properties();
props.putAll(this.readProperties);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer);
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer);
return props;
}
/**
* Returns default read properties with the defined Serializers.
* @param keySerializer serializer for the key.
* @param valueSerializer serializer for the value.
*/
public Properties buildWriteProperties(final String keySerializer,
final String valueSerializer) {
final Properties props = new Properties();
props.putAll(this.writeProperties);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializer);
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializer);
return props;
}
/**
* Builds Kafka Properties used for the UC4 Benchmark pipeline.
......@@ -87,13 +155,9 @@ public class KafkaPropertiesBuilder {
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializer);
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty("specific.avro.writer", TRUE);
props.setProperty(specificAvroWriter, TRUE);
return props;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment