diff --git a/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java index cad108881bf9170656b2962f8fe3381899c15af9..3101225d992476e3e9f29b141542609d887c6259 100644 --- a/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java @@ -112,25 +112,31 @@ public abstract class KafkaStreamsBuilder { protected abstract Topology buildTopology(); /** - * Builds the {@link KafkaStreams} instance. + * Build the {@link Properties} for a {@code KafkaStreams} application. + * + * @return A {@code Properties} object. */ - public KafkaStreams build() { - // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); - Objects.requireNonNull(this.applicationName, "Application name has not been set."); - Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); - - // Build properties. - final Properties properties = PropertiesBuilder + protected Properties buildProperties() { + return PropertiesBuilder .bootstrapServers(this.bootstrapServers) .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) .build(); + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + // Check for required attributes for building properties. + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.applicationName, "Application name has not been set."); + Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); // Create the Kafka streams instance. - return new KafkaStreams(this.buildTopology(), properties); + return new KafkaStreams(this.buildTopology(), this.buildProperties()); } }