Skip to content
Snippets Groups Projects
Commit 9510cc88 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use the common KafkaStreamsBuilder in all use cases

The use cases extend the KafkaStreamsBuilder and only configure
project specific stuff for the Kafaka streams application.
This reduces in the KafkaStremBuilder classes the redundancy.
parent 66504d47
Branches
Tags
1 merge request!4Feature/cleanup kafka streams
Showing
with 258 additions and 509 deletions
...@@ -3,7 +3,7 @@ package spesb.uc1.application; ...@@ -3,7 +3,7 @@ package spesb.uc1.application;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import spesb.uc1.streamprocessing.KafkaStreamsBuilder; import spesb.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.Configurations;
/** /**
...@@ -30,14 +30,16 @@ public class HistoryService { ...@@ -30,14 +30,16 @@ public class HistoryService {
*/ */
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder();
uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC));
final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.build(); .build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
......
package spesb.uc1.streamprocessing;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String applicationName = "uc1-application"; // NOPMD
private String applicationVersion = "0.0.1"; // NOPMD
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder applicationName(final String applicationName) {
this.applicationName = applicationName;
return this;
}
public KafkaStreamsBuilder applicationVersion(final String applicationVersion) {
this.applicationVersion = applicationVersion;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic);
final Properties properties = PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(this.applicationName + '-' + this.applicationVersion)
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package spesb.uc1.streamprocessing;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import spesb.commons.kafkastreams.KafkaStreamsBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
private String inputTopic; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
@Override
protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic).build();
}
}
...@@ -4,12 +4,12 @@ import java.time.Duration; ...@@ -4,12 +4,12 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import spesb.uc2.streamprocessing.KafkaStreamsBuilder; import spesb.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.Configurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates * A microservice that manages the history and, therefore, stores and aggregates incoming
* incoming measurements. * measurements.
* *
*/ */
public class AggregationService { public class AggregationService {
...@@ -35,16 +35,23 @@ public class AggregationService { ...@@ -35,16 +35,23 @@ public class AggregationService {
* @param clusterSession the database session which the application should use. * @param clusterSession the database session which the application should use.
*/ */
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() // Use case specific stream configuration
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder();
uc2KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC))
.windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)))
.gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start(); kafkaStreams.start();
} }
......
package spesb.uc2.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder { // NOPMD builder method
private static final String APPLICATION_NAME = "titan-ccp-aggregation";
private static final String APPLICATION_VERSION = "0.0.1";
private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private String configurationTopic; // NOPMD
private Duration windowSize = null; // NOPMD
private Duration gracePeriod = null; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuffering = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public KafkaStreamsBuilder configurationTopic(final String configurationTopic) {
this.configurationTopic = configurationTopic;
return this;
}
public KafkaStreamsBuilder windowSize(final Duration windowSize) {
this.windowSize = Objects.requireNonNull(windowSize);
return this;
}
public KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) {
this.gracePeriod = Objects.requireNonNull(gracePeriod);
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuffering = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.configurationTopic,
this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod);
return new KafkaStreams(topologyBuilder.build(), this.buildProperties());
}
private Properties buildProperties() {
final Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,
APPLICATION_NAME + '-' + APPLICATION_VERSION); // TODO as parameter
if (this.numThreads > 0) {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads);
}
if (this.commitIntervalMs >= 0) {
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs);
}
if (this.cacheMaxBytesBuffering >= 0) {
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuffering);
}
return properties;
}
}
package spesb.uc2.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import spesb.commons.kafkastreams.KafkaStreamsBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method
private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private String configurationTopic; // NOPMD
private Duration windowSize; // NOPMD
private Duration gracePeriod; // NOPMD
public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) {
this.configurationTopic = configurationTopic;
return this;
}
public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) {
this.windowSize = Objects.requireNonNull(windowSize);
return this;
}
public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) {
this.gracePeriod = Objects.requireNonNull(gracePeriod);
return this;
}
@Override
protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.configurationTopic,
this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod);
return topologyBuilder.build();
}
}
...@@ -5,7 +5,7 @@ import java.util.Objects; ...@@ -5,7 +5,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import spesb.uc3.streamprocessing.KafkaStreamsBuilder; import spesb.uc3.streamprocessing.Uc3KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.Configurations;
/** /**
...@@ -33,11 +33,16 @@ public class HistoryService { ...@@ -33,11 +33,16 @@ public class HistoryService {
* *
*/ */
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() // Use case specific stream configuration
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder();
uc3KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes)) .windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
package spesb.uc3.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER =
// LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
this.windowDuration = windowDuration;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
// .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package spesb.uc3.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import spesb.commons.kafkastreams.KafkaStreamsBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD
public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
this.windowDuration = windowDuration;
return this;
}
@Override
protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration);
return topologyBuilder.build();
}
}
...@@ -4,7 +4,7 @@ import java.time.Duration; ...@@ -4,7 +4,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import spesb.uc4.streamprocessing.KafkaStreamsBuilder; import spesb.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.Configurations;
/** /**
...@@ -30,18 +30,24 @@ public class HistoryService { ...@@ -30,18 +30,24 @@ public class HistoryService {
* *
*/ */
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() // Use case specific stream configuration
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder();
uc4KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.aggregtionDuration( .aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance( .aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build(); .build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start(); kafkaStreams.start();
} }
......
package spesb.uc4.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER =
// LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration aggregtionDuration; // NOPMD
private Duration aggregationAdvance; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) {
this.aggregtionDuration = aggregtionDuration;
return this;
}
public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) {
this.aggregationAdvance = aggregationAdvance;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.aggregtionDuration,
this.aggregationAdvance);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
// .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package spesb.uc4.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import spesb.commons.kafkastreams.KafkaStreamsBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration aggregtionDuration; // NOPMD
private Duration aggregationAdvance; // NOPMD
public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) {
this.aggregtionDuration = aggregtionDuration;
return this;
}
public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) {
this.aggregationAdvance = aggregationAdvance;
return this;
}
@Override
protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.aggregtionDuration,
this.aggregationAdvance);
return topologyBuilder.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment