Skip to content
Snippets Groups Projects
Commit 3db33520 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Add schema registry field to common kafka streams builder.

To allow using the schema registry kafka streams builder
takes an URL for using late the avor schema regitstry
factory.
parent 61239895
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
...@@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; ...@@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder;
public abstract class KafkaStreamsBuilder { public abstract class KafkaStreamsBuilder {
// Kafkastreams application specific // Kafkastreams application specific
protected String schemaRegistryUrl; // NOPMD for use in subclass
private String applicationName; // NOPMD private String applicationName; // NOPMD
private String applicationVersion; // NOPMD private String applicationVersion; // NOPMD
private String bootstrapServers; // NOPMD private String bootstrapServers; // NOPMD
...@@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { ...@@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder {
return this; return this;
} }
/**
* Sets the URL for the schema registry.
*
* @param url The URL of the schema registry.
* @return
*/
public KafkaStreamsBuilder schemaRegistry(final String url) {
this.schemaRegistryUrl = url;
return this;
}
/** /**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default. * one for using the default.
...@@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { ...@@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder {
*/ */
public KafkaStreams build() { public KafkaStreams build() {
// Check for required attributes for building properties. // Check for required attributes for building properties.
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set.");
Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set.");
// Create the Kafka streams instance. // Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties()); return new KafkaStreams(this.buildTopology(), this.buildProperties());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment