From dd22b618e0ae122f927c60e3e530fab1ed169295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Thu, 14 May 2020 18:20:28 +0200 Subject: [PATCH] Add a buildProperty function to KafkaStreamsBuilder in commons This allows the usecases to create its own Properties object if needed. --- .../kafkastreams/KafkaStreamsBuilder.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java index cad108881..3101225d9 100644 --- a/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java @@ -112,25 +112,31 @@ public abstract class KafkaStreamsBuilder { protected abstract Topology buildTopology(); /** - * Builds the {@link KafkaStreams} instance. + * Build the {@link Properties} for a {@code KafkaStreams} application. + * + * @return A {@code Properties} object. */ - public KafkaStreams build() { - // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); - Objects.requireNonNull(this.applicationName, "Application name has not been set."); - Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); - - // Build properties. - final Properties properties = PropertiesBuilder + protected Properties buildProperties() { + return PropertiesBuilder .bootstrapServers(this.bootstrapServers) .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) .build(); + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + // Check for required attributes for building properties. + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.applicationName, "Application name has not been set."); + Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); // Create the Kafka streams instance. - return new KafkaStreams(this.buildTopology(), properties); + return new KafkaStreams(this.buildTopology(), this.buildProperties()); } } -- GitLab