Skip to content
Snippets Groups Projects
Commit dd22b618 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Add a buildProperty function to KafkaStreamsBuilder in commons

This allows the usecases to create its own Properties object if
needed.
parent ce6dc165
No related branches found
No related tags found
No related merge requests found
...@@ -112,25 +112,31 @@ public abstract class KafkaStreamsBuilder { ...@@ -112,25 +112,31 @@ public abstract class KafkaStreamsBuilder {
protected abstract Topology buildTopology(); protected abstract Topology buildTopology();
/** /**
* Builds the {@link KafkaStreams} instance. * Build the {@link Properties} for a {@code KafkaStreams} application.
*
* @return A {@code Properties} object.
*/ */
public KafkaStreams build() { protected Properties buildProperties() {
// Check for required attributes for building properties. return PropertiesBuilder
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.applicationName, "Application name has not been set.");
Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
// Build properties.
final Properties properties = PropertiesBuilder
.bootstrapServers(this.bootstrapServers) .bootstrapServers(this.bootstrapServers)
.applicationId(this.applicationName + '-' + this.applicationVersion) .applicationId(this.applicationName + '-' + this.applicationVersion)
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.build(); .build();
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
// Check for required attributes for building properties.
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.applicationName, "Application name has not been set.");
Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
// Create the Kafka streams instance. // Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), properties); return new KafkaStreams(this.buildTopology(), this.buildProperties());
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment