diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java index 438ef510daa9af084ede9f9ef01e8003ad70efb5..70bd6d5fd29eae95bfd7cb895f6c9a7b4176f1c2 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java @@ -3,7 +3,7 @@ package spesb.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc1.streamprocessing.KafkaStreamsBuilder; +import spesb.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,14 +30,16 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); + uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index a289462a5b6a1f05c347dc66109633046b462c44..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,98 +0,0 @@ -package spesb.uc1.streamprocessing; - -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String applicationName = "uc1-application"; // NOPMD - private String applicationVersion = "0.0.1"; // NOPMD - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder applicationName(final String applicationName) { - this.applicationName = applicationName; - return this; - } - - public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { - this.applicationVersion = applicationVersion; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic); - final Properties properties = PropertiesBuilder - .bootstrapServers(this.bootstrapServers) - .applicationId(this.applicationName + '-' + this.applicationVersion) - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..7283b39e2b0918ddff9585835fa1e478303dd90b --- /dev/null +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -0,0 +1,23 @@ +package spesb.uc1.streamprocessing; + +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { + private String inputTopic; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + return new TopologyBuilder(this.inputTopic).build(); + } +} diff --git a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java index 79d8c94c75ede32d92485d4b3c49d716ae19ccf8..bc6fdc067d7d1b36efe489ed0cf0a4e1145231af 100644 --- a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java @@ -4,56 +4,63 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc2.streamprocessing.KafkaStreamsBuilder; +import spesb.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** - * A microservice that manages the history and, therefore, stores and aggregates - * incoming measurements. + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. * */ public class AggregationService { - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - /** - * Start the service. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - public static void main(final String[] args) { - new AggregationService().run(); - } - - /** - * Build and start the underlying Kafka Streams Application of the service. - * - * @param clusterSession the database session which the application should use. - */ - private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - /** - * Stop the service. - */ - public void stop() { - this.stopEvent.complete(null); - } + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + public static void main(final String[] args) { + new AggregationService().run(); + } + + /** + * Build and start the underlying Kafka Streams Application of the service. + * + * @param clusterSession the database session which the application should use. + */ + private void createKafkaStreamsApplication() { + // Use case specific stream configuration + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + uc2KafkaStreamsBuilder + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) + .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .build(); + + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + /** + * Stop the service. + */ + public void stop() { + this.stopEvent.complete(null); + } } diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9b43f5e66fb4336602c026df8941d5545f39bfb4..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,134 +0,0 @@ -package spesb.uc2.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { // NOPMD builder method - - private static final String APPLICATION_NAME = "titan-ccp-aggregation"; - private static final String APPLICATION_VERSION = "0.0.1"; - - private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration windowSize = null; // NOPMD - private Duration gracePeriod = null; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuffering = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public KafkaStreamsBuilder windowSize(final Duration windowSize) { - this.windowSize = Objects.requireNonNull(windowSize); - return this; - } - - public KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuffering = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.configurationTopic, - this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); - return new KafkaStreams(topologyBuilder.build(), this.buildProperties()); - } - - private Properties buildProperties() { - final Properties properties = new Properties(); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, - APPLICATION_NAME + '-' + APPLICATION_VERSION); // TODO as parameter - if (this.numThreads > 0) { - properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads); - } - if (this.commitIntervalMs >= 0) { - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs); - } - if (this.cacheMaxBytesBuffering >= 0) { - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuffering); - } - return properties; - } - -} diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..875a45ee92b0ec1b229678597c20f7cbb381b7c5 --- /dev/null +++ b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -0,0 +1,63 @@ +package spesb.uc2.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + + private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private String configurationTopic; // NOPMD + private Duration windowSize; // NOPMD + private Duration gracePeriod; // NOPMD + + public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { + this.windowSize = Objects.requireNonNull(windowSize); + return this; + } + + public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.configurationTopic, + this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); + + return topologyBuilder.build(); + } + +} diff --git a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java index 2b6c40e51a09e179778209d0626da6f6718bc07a..90f5a828e0adb030d3ecc86a2bd34bba780672b3 100644 --- a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java @@ -5,7 +5,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc3.streamprocessing.KafkaStreamsBuilder; +import spesb.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -33,11 +33,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + uc3KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)) + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 28382bedd3b02ceb2c48925212087c28ed371aad..0000000000000000000000000000000000000000 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -package spesb.uc3.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..b7f5d517c27ffe825161a50623bbcc0e5506c4d3 --- /dev/null +++ b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -0,0 +1,43 @@ +package spesb.uc3.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration windowDuration; // NOPMD + + public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + this.windowDuration); + return topologyBuilder.build(); + } + +} diff --git a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java index f86f0cb7e3bc6840db52ce7bdbbac054cdd05e13..56275d594a9fcd4c4ea5a8416da5ced2e9c52ff9 100644 --- a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java @@ -4,7 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc4.streamprocessing.KafkaStreamsBuilder; +import spesb.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,18 +30,24 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + uc4KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9cbff4f61ec5975e3dcdfc5c4e4a9f900e6707ec..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,117 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; - return this; - } - - public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.aggregtionDuration, - this.aggregationAdvance); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..d248c02158befc77e42033fa6a30816cba97d816 --- /dev/null +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -0,0 +1,54 @@ +package spesb.uc4.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD + + public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.aggregtionDuration, + this.aggregationAdvance); + + return topologyBuilder.build(); + } + +}