diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java deleted file mode 100644 index c5d38807dedc1370e0fe9310136e0376a5bec46e..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java +++ /dev/null @@ -1,55 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a Properties object containing the needed kafka properties used for the UC1 benchmark of - * Hazelcast Jet. - */ -public class Uc1KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC1 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not net by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC1 - * Pipeline. - */ - public Properties buildKafkaPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - // comment: - // > Could not find constant fields for all properties - // > setProperties not applicable for non string values - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - - - return props; - } - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java deleted file mode 100644 index 8c61a0339e177c28b4f7f9182ce820b100bb9e9a..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2KafkaPropertiesBuilder.java +++ /dev/null @@ -1,79 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC2 benchmark of Hazelcast Jet. - */ -public class Uc2KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC2 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2 - * Pipeline. - */ - public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - // comment: - // > Could not find constant fields for all properties - // > setProperties not applicable for non string values - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; - } - - /** - * Builds Kafka Properties used for the UC2 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC2 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - - final Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java deleted file mode 100644 index be1e29e247d150a1cc266856c5800175de16aefa..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3KafkaPropertiesBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC3 benchmark of Hazelcast Jet. - */ -public class Uc3KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - - /** - * Builds Kafka Properties used for the UC3 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by envrionment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3 - * Pipeline. - */ - public Properties buildKafkaReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return props; - } - - /** - * Builds Kafka Properties used for the UC3 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC3 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - - final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java index a352ec6caf7267ed61713810392e31454b1efedb..9b6aa71267150296d8b65268b1922925b7ada796 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java @@ -4,6 +4,7 @@ import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Objects; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; @@ -205,8 +206,8 @@ public class Uc4HazelcastJetFactory { final Properties kafkaInputReadProps = propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault, jobName, - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); final Properties kafkaConfigReadProps = propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, @@ -238,9 +239,12 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); + propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + StringSerializer.class.getCanonicalName(), + KafkaAvroSerializer.class.getCanonicalName()); this.kafkaWritePropsForPipeline = kafkaWriteProps; return this; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java deleted file mode 100644 index f655b13ed24333ca0940706d8da569a8b6709e44..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4KafkaPropertiesBuilder.java +++ /dev/null @@ -1,144 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; - -/** - * Builds a read and write Properties objects containing the needed kafka properties used for the - * UC4 benchmark of Hazelcast Jet. - */ -public class Uc4KafkaPropertiesBuilder { - - private static final String TRUE = "true"; - private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaInputReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaAggregationReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaAvroDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaConfigReadPropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - EventDeserializer.class.getCanonicalName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); - return props; - } - - /** - * Builds Kafka Properties used for the UC4 Benchmark pipeline. - * - * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. - * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 - * Pipeline. - */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault, - final String schemaRegistryUrlDefault) { - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - kafkaBootstrapServerDefault); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - schemaRegistryUrlDefault); - - final Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getCanonicalName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - KafkaAvroSerializer.class.getCanonicalName()); - props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - props.setProperty("specific.avro.writer", TRUE); - - return props; - } - -}