From 9510cc88303e3ec681a6817f20a115d7532c04e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Thu, 14 May 2020 18:36:32 +0200 Subject: [PATCH] Use the common KafkaStreamsBuilder in all use cases The use cases extend the KafkaStreamsBuilder and only configure project specific stuff for the Kafaka streams application. This reduces in the KafkaStremBuilder classes the redundancy. --- .../spesb/uc1/application/HistoryService.java | 8 +- .../streamprocessing/KafkaStreamsBuilder.java | 98 ------------- .../Uc1KafkaStreamsBuilder.java | 23 +++ .../uc2/application/AggregationService.java | 95 +++++++------ .../streamprocessing/KafkaStreamsBuilder.java | 134 ------------------ .../Uc2KafkaStreamsBuilder.java | 63 ++++++++ .../spesb/uc3/application/HistoryService.java | 13 +- .../streamprocessing/KafkaStreamsBuilder.java | 105 -------------- .../Uc3KafkaStreamsBuilder.java | 43 ++++++ .../spesb/uc4/application/HistoryService.java | 14 +- .../streamprocessing/KafkaStreamsBuilder.java | 117 --------------- .../Uc4KafkaStreamsBuilder.java | 54 +++++++ 12 files changed, 258 insertions(+), 509 deletions(-) delete mode 100644 uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java create mode 100644 uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java delete mode 100644 uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java create mode 100644 uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java delete mode 100644 uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java create mode 100644 uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java create mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java index 438ef510d..70bd6d5fd 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java @@ -3,7 +3,7 @@ package spesb.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc1.streamprocessing.KafkaStreamsBuilder; +import spesb.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,14 +30,16 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); + uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index a289462a5..000000000 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,98 +0,0 @@ -package spesb.uc1.streamprocessing; - -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String applicationName = "uc1-application"; // NOPMD - private String applicationVersion = "0.0.1"; // NOPMD - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder applicationName(final String applicationName) { - this.applicationName = applicationName; - return this; - } - - public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { - this.applicationVersion = applicationVersion; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic); - final Properties properties = PropertiesBuilder - .bootstrapServers(this.bootstrapServers) - .applicationId(this.applicationName + '-' + this.applicationVersion) - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java new file mode 100644 index 000000000..7283b39e2 --- /dev/null +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -0,0 +1,23 @@ +package spesb.uc1.streamprocessing; + +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { + private String inputTopic; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + return new TopologyBuilder(this.inputTopic).build(); + } +} diff --git a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java index 79d8c94c7..bc6fdc067 100644 --- a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java @@ -4,56 +4,63 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc2.streamprocessing.KafkaStreamsBuilder; +import spesb.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** - * A microservice that manages the history and, therefore, stores and aggregates - * incoming measurements. + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. * */ public class AggregationService { - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - /** - * Start the service. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - public static void main(final String[] args) { - new AggregationService().run(); - } - - /** - * Build and start the underlying Kafka Streams Application of the service. - * - * @param clusterSession the database session which the application should use. - */ - private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - /** - * Stop the service. - */ - public void stop() { - this.stopEvent.complete(null); - } + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + public static void main(final String[] args) { + new AggregationService().run(); + } + + /** + * Build and start the underlying Kafka Streams Application of the service. + * + * @param clusterSession the database session which the application should use. + */ + private void createKafkaStreamsApplication() { + // Use case specific stream configuration + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + uc2KafkaStreamsBuilder + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) + .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .build(); + + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + /** + * Stop the service. + */ + public void stop() { + this.stopEvent.complete(null); + } } diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9b43f5e66..000000000 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,134 +0,0 @@ -package spesb.uc2.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { // NOPMD builder method - - private static final String APPLICATION_NAME = "titan-ccp-aggregation"; - private static final String APPLICATION_VERSION = "0.0.1"; - - private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration windowSize = null; // NOPMD - private Duration gracePeriod = null; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuffering = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public KafkaStreamsBuilder windowSize(final Duration windowSize) { - this.windowSize = Objects.requireNonNull(windowSize); - return this; - } - - public KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuffering = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.configurationTopic, - this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); - return new KafkaStreams(topologyBuilder.build(), this.buildProperties()); - } - - private Properties buildProperties() { - final Properties properties = new Properties(); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, - APPLICATION_NAME + '-' + APPLICATION_VERSION); // TODO as parameter - if (this.numThreads > 0) { - properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads); - } - if (this.commitIntervalMs >= 0) { - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs); - } - if (this.cacheMaxBytesBuffering >= 0) { - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuffering); - } - return properties; - } - -} diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java new file mode 100644 index 000000000..875a45ee9 --- /dev/null +++ b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -0,0 +1,63 @@ +package spesb.uc2.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + + private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private String configurationTopic; // NOPMD + private Duration windowSize; // NOPMD + private Duration gracePeriod; // NOPMD + + public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { + this.windowSize = Objects.requireNonNull(windowSize); + return this; + } + + public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.configurationTopic, + this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); + + return topologyBuilder.build(); + } + +} diff --git a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java index 2b6c40e51..90f5a828e 100644 --- a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java @@ -5,7 +5,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc3.streamprocessing.KafkaStreamsBuilder; +import spesb.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -33,11 +33,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + uc3KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)) + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 28382bedd..000000000 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -package spesb.uc3.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java new file mode 100644 index 000000000..b7f5d517c --- /dev/null +++ b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -0,0 +1,43 @@ +package spesb.uc3.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration windowDuration; // NOPMD + + public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + this.windowDuration); + return topologyBuilder.build(); + } + +} diff --git a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java index f86f0cb7e..56275d594 100644 --- a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java @@ -4,7 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc4.streamprocessing.KafkaStreamsBuilder; +import spesb.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,18 +30,24 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + uc4KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9cbff4f61..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,117 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; - return this; - } - - public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.aggregtionDuration, - this.aggregationAdvance); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java new file mode 100644 index 000000000..d248c0215 --- /dev/null +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -0,0 +1,54 @@ +package spesb.uc4.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD + + public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.aggregtionDuration, + this.aggregationAdvance); + + return topologyBuilder.build(); + } + +} -- GitLab