Skip to content
Snippets Groups Projects
Commit 408d1e5d authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Delete depreciated ucXKafkaProperties classes

+ fix uc4 HazelcastFactory
parent 09e71578
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7313 passed
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
/**
* Builds a Properties object containing the needed kafka properties used for the UC1 benchmark of
* Hazelcast Jet.
*/
public class Uc1KafkaPropertiesBuilder {
private static final String TRUE = "true";
/**
* Builds Kafka Properties used for the UC1 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not net by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC1
* Pipeline.
*/
public Properties buildKafkaPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
// comment:
// > Could not find constant fields for all properties
// > setProperties not applicable for non string values
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
return props;
}
}
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
/**
* Builds a read and write Properties objects containing the needed kafka properties used for the
* UC2 benchmark of Hazelcast Jet.
*/
public class Uc2KafkaPropertiesBuilder {
private static final String TRUE = "true";
/**
* Builds Kafka Properties used for the UC2 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2
* Pipeline.
*/
public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
// comment:
// > Could not find constant fields for all properties
// > setProperties not applicable for non string values
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
/**
* Builds Kafka Properties used for the UC2 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2
* Pipeline.
*/
public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.setProperty("specific.avro.writer", TRUE);
return props;
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
/**
* Builds a read and write Properties objects containing the needed kafka properties used for the
* UC3 benchmark of Hazelcast Jet.
*/
public class Uc3KafkaPropertiesBuilder {
private static final String TRUE = "true";
/**
* Builds Kafka Properties used for the UC3 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3
* Pipeline.
*/
public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
/**
* Builds Kafka Properties used for the UC3 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3
* Pipeline.
*/
public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.setProperty("specific.avro.writer", TRUE);
return props;
}
}
......@@ -4,6 +4,7 @@ import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
......@@ -205,8 +206,8 @@ public class Uc4HazelcastJetFactory {
final Properties kafkaInputReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault, jobName,
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaConfigReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
......@@ -238,9 +239,12 @@ public class Uc4HazelcastJetFactory {
public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder();
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaWriteProps =
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault);
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
this.kafkaWritePropsForPipeline = kafkaWriteProps;
return this;
}
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
/**
* Builds a read and write Properties objects containing the needed kafka properties used for the
* UC4 benchmark of Hazelcast Jet.
*/
public class Uc4KafkaPropertiesBuilder {
private static final String TRUE = "true";
private static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
/**
* Builds Kafka Properties used for the UC4 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline.
*/
public Properties buildKafkaInputReadPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
return props;
}
/**
* Builds Kafka Properties used for the UC4 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline.
*/
public Properties buildKafkaAggregationReadPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
return props;
}
/**
* Builds Kafka Properties used for the UC4 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @param schemaRegistryUrlDefault Default schema registry URL if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline.
*/
public Properties buildKafkaConfigReadPropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
EventDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
return props;
}
/**
* Builds Kafka Properties used for the UC4 Benchmark pipeline.
*
* @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment.
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline.
*/
public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getCanonicalName());
props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.setProperty("specific.avro.writer", TRUE);
return props;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment