diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..3c1783e798e1ee7cf3987700a781c41229cda742 --- /dev/null +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/KafkaPropertiesBuilder.java @@ -0,0 +1,88 @@ +package rocks.theodolite.benchmarks.commons.hazelcastjet; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + + +public class KafkaPropertiesBuilder { + + private static final String TRUE = "true"; + private static final String AUTO_OFFSET_RESET_CONFIG = "earliest"; + + + /** + * Builds Kafka Properties used for the UC4 Benchmark pipeline. + * + * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. + * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. + * @return A Kafka Properties Object containing the values needed for a Pipeline. + */ + public Properties buildKafkaInputReadPropsFromEnv(final String kafkaBootstrapServerDefault, + final String schemaRegistryUrlDefault, + final String applicationName, + final String keyDeserializer, + final String valueDeserializer) { + + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + kafkaBootstrapServerDefault); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + schemaRegistryUrlDefault); + + final Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + keyDeserializer); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + valueDeserializer); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, TRUE); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); + + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,applicationName); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,TRUE); + + return props; + } + + /** + * Builds Kafka Properties used for the UC4 Benchmark pipeline. + * + * @param kafkaBootstrapServerDefault Default bootstrap server if not set by environment. + * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 + * Pipeline. + */ + public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault, + final String schemaRegistryUrlDefault, + final String keySerializer, + final String valueSerializer) { + + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + kafkaBootstrapServerDefault); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + schemaRegistryUrlDefault); + + final Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + keySerializer); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + valueSerializer); + props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty("specific.avro.writer", TRUE); + + return props; + } + + + + + +} diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java index 4795c9e33bce4e2b9d6f6878204e9e31a3b7063d..83848261318b2e90d19f28d9ab53fdc2cf678279 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java @@ -54,7 +54,7 @@ public class HistoryService { */ private void createHazelcastJetApplication() throws Exception { // NOPMD new Uc1HazelcastJetFactory() - .setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) + .setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) .setKafkaInputTopicFromEnv(KAFKA_TOPIC_DEFAULT) .buildUc1Pipeline() .buildUc1JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java index d69c93f740abb9cc8353fb14bcdd44edd13faca6..4a5c5dead14e606847dc5e2ac3c95414d9f611b3 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java @@ -3,11 +3,14 @@ package rocks.theodolite.benchmarks.uc1.hazelcastjet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; +import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; /** * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC1 @@ -131,12 +134,16 @@ public class Uc1HazelcastJetFactory { * @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline. */ public Uc1HazelcastJetFactory setPropertiesFromEnv(final String bootstrapServersDefault, // NOPMD - final String schemaRegistryUrlDefault) { + final String schemaRegistryUrlDefault, + final String jobName) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc1KafkaPropertiesBuilder propsBuilder = new Uc1KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaProps = - propsBuilder.buildKafkaPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + jobName, + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); this.kafkaPropertiesForPipeline = kafkaProps; return this; } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java index e753afd999bd1dfa2e367f4d6d6189ed946420f1..c5d38807dedc1370e0fe9310136e0376a5bec46e 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1KafkaPropertiesBuilder.java @@ -20,7 +20,7 @@ public class Uc1KafkaPropertiesBuilder { /** * Builds Kafka Properties used for the UC1 Benchmark pipeline. * - * @param kafkaBootstrapServerDefault Default bootstrap server if not net by envrionment. + * @param kafkaBootstrapServerDefault Default bootstrap server if not net by environment. * @param schemaRegistryUrlDefault Default schema registry URL if not set by environment. * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC1 * Pipeline. diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index ea2056ce0eb47515db1ebdf6c4fe0d270b861969..f382978b714fdfdff6c190339c2ed23a2e037069 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -57,8 +57,8 @@ public class HistoryService { */ private void createHazelcastJetApplication() throws Exception { // NOPMD new Uc2HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) - .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT) + .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) + .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT,SCHEMA_REGISTRY_URL_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT_MS) diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java index ca4f95712b33f6916ec73ed3a20169030aee006f..143b154f3726e75d2842766b49bd2e26f57ce39b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java @@ -4,11 +4,15 @@ import com.google.common.math.StatsAccumulator; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; +import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; /** @@ -176,13 +180,17 @@ public class Uc2HazelcastJetFactory { * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. */ public Uc2HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault) { + final String bootstrapServersDefault, + final String schemaRegistryUrlDefault, + final String jobName) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc2KafkaPropertiesBuilder propsBuilder = new Uc2KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaReadProps = - propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + jobName, + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); this.kafkaReadPropsForPipeline = kafkaReadProps; return this; } @@ -195,11 +203,14 @@ public class Uc2HazelcastJetFactory { * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. */ public Uc2HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault) { + final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc2KafkaPropertiesBuilder propsBuilder = new Uc2KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); + propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); this.kafkaWritePropsForPipeline = kafkaWriteProps; return this; } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java index 9b02da5b69a3b4f7ecbfb048cd2b9cc198d51c77..ecf38bd6c6a85e6d0f1431708a69f3431aff4730 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java @@ -58,8 +58,8 @@ public class HistoryService { */ private void createHazelcastJetApplication() throws Exception { // NOPMD new Uc3HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) - .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT) + .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT, JOB_NAME) + .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) .setWindowSizeInSecondsFromEnv(WINDOW_SIZE_IN_SECONDS_DEFAULT) diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java index 2e869056cdf8484b94554be639f3a3dbc38db39c..be6d70d27b9a868914ec5d28e84b4a90454ab56c 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java @@ -3,11 +3,15 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; +import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; @@ -188,13 +192,17 @@ public class Uc3HazelcastJetFactory { // NOPMD * @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. */ public Uc3HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault) { + final String bootstrapServersDefault, + final String schemaRegistryUrlDefault, + final String jobName) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc3KafkaPropertiesBuilder propsBuilder = new Uc3KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaReadProps = - propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + jobName, + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); this.kafkaReadPropsForPipeline = kafkaReadProps; return this; } @@ -207,11 +215,14 @@ public class Uc3HazelcastJetFactory { // NOPMD * @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. */ public Uc3HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault) { + final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc3KafkaPropertiesBuilder propsBuilder = new Uc3KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); + propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); this.kafkaWritePropsForPipeline = kafkaWriteProps; return this; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index cd0b961c7bd75b5e754c16ceda573e47e085d80d..419c25fec3eeffbd9eabef4897c44b7c6e773cee 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -59,7 +59,7 @@ public class HistoryService { */ private void createHazelcastJetApplication() throws Exception { // NOPMD new Uc4HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) + .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java index a4dc2d823f55f93e8d2257d3df189786b75d746b..a352ec6caf7267ed61713810392e31454b1efedb 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java @@ -3,11 +3,16 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; +import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer; import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer; @@ -15,15 +20,13 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; - - /** * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4 * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build - * the Read and Write Propertiesand set the input, output, and configuration topic. This can be done - * using internal functions of this factory. Outside data only refers to custom values or default - * values in case data of the environment cannot the fetched. + * the Read and Write Properties and set the input, output, and configuration topic. This can be + * done using internal functions of this factory. Outside data only refers to custom values or + * default values in case data of the environment cannot the fetched. */ public class Uc4HazelcastJetFactory { @@ -193,19 +196,32 @@ public class Uc4HazelcastJetFactory { * @return The Uc4HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. */ public Uc4HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault) { + final String bootstrapServersDefault, + final String schemaRegistryUrlDefault, + final String jobName) { // Use KafkaPropertiesBuilder to build a properties object used for kafka - final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder(); + final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); + final Properties kafkaInputReadProps = propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + schemaRegistryUrlDefault, jobName, + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); + final Properties kafkaConfigReadProps = - propsBuilder.buildKafkaConfigReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + jobName, + EventDeserializer.class.getCanonicalName(), + StringDeserializer.class.getCanonicalName()); + final Properties kafkaAggregationReadProps = - propsBuilder.buildKafkaAggregationReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault); + propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, + schemaRegistryUrlDefault, + jobName, + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + this.kafkaInputReadPropsForPipeline = kafkaInputReadProps; this.kafkaConfigPropsForPipeline = kafkaConfigReadProps; this.kafkaFeedbackPropsForPipeline = kafkaAggregationReadProps;