From 3db33520dba8d172b8342b0ab7dbf92fbd11cf90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Thu, 28 May 2020 09:37:34 +0200 Subject: [PATCH] Add schema registry field to common kafka streams builder. To allow using the schema registry kafka streams builder takes an URL for using late the avor schema regitstry factory. --- .../kafkastreams/KafkaStreamsBuilder.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ae2a6dafa..8c758c244 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; public abstract class KafkaStreamsBuilder { // Kafkastreams application specific + protected String schemaRegistryUrl; // NOPMD for use in subclass + private String applicationName; // NOPMD private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD @@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { return this; } + /** + * Sets the URL for the schema registry. + * + * @param url The URL of the schema registry. + * @return + */ + public KafkaStreamsBuilder schemaRegistry(final String url) { + this.schemaRegistryUrl = url; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { */ public KafkaStreams build() { // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties()); -- GitLab