diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ae2a6dafa3d36dada927d17a1ca00d2df63db78b..8c758c24444ea9c590c364063a397f9b7bfec8f9 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; public abstract class KafkaStreamsBuilder { // Kafkastreams application specific + protected String schemaRegistryUrl; // NOPMD for use in subclass + private String applicationName; // NOPMD private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD @@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { return this; } + /** + * Sets the URL for the schema registry. + * + * @param url The URL of the schema registry. + * @return + */ + public KafkaStreamsBuilder schemaRegistry(final String url) { + this.schemaRegistryUrl = url; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { */ public KafkaStreams build() { // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties());