diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index 6302e4c69904aaf57e3f936ee9ad0ead11414a8d..ca1838b84a4f1b3ddf11ad4dea8e34792371974b 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -9,12 +9,6 @@ public final class ConfigurationKeys { public static final String APPLICATION_VERSION = "application.version"; - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index 8c758c24444ea9c590c364063a397f9b7bfec8f9..0bddef99292f0e87a298335651d177a9579d7ff4 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -1,7 +1,9 @@ package theodolite.commons.kafkastreams; -import java.util.Objects; import java.util.Properties; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -12,109 +14,92 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; */ public abstract class KafkaStreamsBuilder { - // Kafkastreams application specific - protected String schemaRegistryUrl; // NOPMD for use in subclass + // Kafka Streams application specific + protected final String schemaRegistryUrl; // NOPMD for use in subclass + protected final String inputTopic; // NOPMD for use in subclass - private String applicationName; // NOPMD - private String applicationVersion; // NOPMD - private String bootstrapServers; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD + private final Configuration config; - /** - * Sets the application name for the {@code KafkaStreams} application. It is used to create the - * application ID. - * - * @param applicationName Name of the application. - * @return - */ - public KafkaStreamsBuilder applicationName(final String applicationName) { - this.applicationName = applicationName; - return this; - } - - /** - * Sets the application version for the {@code KafkaStreams} application. It is used to create the - * application ID. - * - * @param applicationVersion Version of the application. - * @return - */ - public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { - this.applicationVersion = applicationVersion; - return this; - } + private final String applicationName; // NOPMD + private final String applicationVersion; // NOPMD + private final String bootstrapServers; // NOPMD /** - * Sets the bootstrap servers for the {@code KafkaStreams} application. + * Construct a new Build object for a Kafka Streams application. * - * @param bootstrapServers String for a bootstrap server. - * @return + * @param config Contains the key value pairs for configuration. */ - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; + public KafkaStreamsBuilder(final Configuration config) { + this.config = config; + this.applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); + this.applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); + this.bootstrapServers = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); + this.inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); } /** - * Sets the URL for the schema registry. + * Checks if the given key is contained in the configurations and sets it in the properties. * - * @param url The URL of the schema registry. - * @return + * @param <T> Type of the value for given key + * @param propBuilder Object where to set this property. + * @param key The key to check and set the property. + * @param valueGetter Method to get the value from with given key. + * @param condition for setting the property. */ - public KafkaStreamsBuilder schemaRegistry(final String url) { - this.schemaRegistryUrl = url; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - * - * @param numThreads Number of threads. -1 for using the default. - * @return - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); + private <T> void setOptionalProperty(final PropertiesBuilder propBuilder, + final String key, + final Function<String, T> valueGetter, + final Predicate<T> condition) { + if (this.config.containsKey(key)) { + final T value = valueGetter.apply(key); + propBuilder.set(key, value, condition); } - this.numThreads = numThreads; - return this; } /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. + * Build the {@link Properties} for a {@code KafkaStreams} application. * - * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using - * the default. - * @return + * @return A {@code Properties} object. */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); + protected Properties buildProperties() { + // required configuration + final PropertiesBuilder propBuilder = PropertiesBuilder + .bootstrapServers(this.bootstrapServers) + .applicationId(this.applicationName + '-' + this.applicationVersion); + + // optional configurations + this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, + this.config::getInt, p -> p > 0); + this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, + this.config::getInt, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, + this.config::getInt, p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, + this.config::getInt, p -> p >= 1); + this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, + this.config::getInt, p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG, + this.config::getInt, p -> p > 0); + this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG, + this.config::getInt, p -> p >= 0); + + if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION) + && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) { + propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); } - this.commitIntervalMs = commitIntervalMs; - return this; - } - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - * - * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all - * threads. -1 for using the default. - * @return - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; + return propBuilder.build(); } /** @@ -124,31 +109,10 @@ public abstract class KafkaStreamsBuilder { */ protected abstract Topology buildTopology(); - /** - * Build the {@link Properties} for a {@code KafkaStreams} application. - * - * @return A {@code Properties} object. - */ - protected Properties buildProperties() { - return PropertiesBuilder - .bootstrapServers(this.bootstrapServers) - .applicationId(this.applicationName + '-' + this.applicationVersion) - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - .build(); - } - /** * Builds the {@link KafkaStreams} instance. */ public KafkaStreams build() { - // Check for required attributes for building properties. - Objects.requireNonNull(this.applicationName, "Application name has not been set."); - Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); - Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); - // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties()); } diff --git a/build.gradle b/build.gradle index e0c733b246f9434032e701d1a12533b0862ae884..3cb86b68e9d37c53572c6611fad1057b5505e9cc 100644 --- a/build.gradle +++ b/build.gradle @@ -101,6 +101,7 @@ configure(commonProjects) { implementation 'org.slf4j:slf4j-simple:1.7.25' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'org.apache.kafka:kafka-streams:2.6.0' // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index a35cc37b36fb906e5c5495006126374d4de4656c..f0d8062a2442181507c0bef990b73e0e9cf4a372 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -3,7 +3,6 @@ package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; @@ -31,18 +30,9 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); - uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); - - final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .build(); + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(this.config); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 7699ecb48369a2041777b901931c46072a10d99f..14335282863bff5a170716b228ea363e3d739685 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -9,11 +10,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; * Builder for the Kafka Streams configuration. */ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc1KafkaStreamsBuilder(final Configuration config) { + super(config); } @Override diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index 3fb301516daa4c7e14875d3d9ca9df9c770eb69e..b46e6246e248cc524c5b6249348c76ded6ec468b 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -3,10 +3,6 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input -kafka.output.topic=output schema.registry.url=http://localhost:8091 -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index c094adfcd7952e81115dae84ed9c0d371e380c98..2f828278f5a3033c3e479bf82f3c8c5d9d4c380c 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -36,26 +36,15 @@ public class AggregationService { * @param clusterSession the database session which the application should use. */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); uc2KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 16addb8510eec2254d4787edbfbfbe186996fdea..1a606ee3df5e6ac2f43b650afe4a9aed036df9cd 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -14,16 +15,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - private String inputTopic; // NOPMD private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD private String configurationTopic; // NOPMD private Duration emitPeriod; // NOPMD private Duration gracePeriod; // NOPMD - public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc2KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties index 10c47960adb012ba5c572e3833a37d821189eb8e..8f1af5f590eff7f2b12706d61a7c89d9152f7949 100644 --- a/uc2-application/src/main/resources/META-INF/application.properties +++ b/uc2-application/src/main/resources/META-INF/application.properties @@ -10,8 +10,4 @@ kafka.output.topic=output schema.registry.url=http://localhost:8091 emit.period.ms=5000 -grace.period.ms=0 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 +grace.period.ms=0 \ No newline at end of file diff --git a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java deleted file mode 100644 index ab6f08c017bb78a72c4896d766b38f7b8485c7fb..0000000000000000000000000000000000000000 --- a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ /dev/null @@ -1,29 +0,0 @@ -package theodolite.uc3.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - - private ConfigurationKeys() { - } - -} diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index b245b1645c9e5ee68df3f108802c9b91d70cf017..349512f988bb182d8851e458a1bce244c756bbfe 100644 --- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -34,23 +34,13 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index e74adf7c87673cc0e6ea4004dbcb1c0a6fc907ac..9ab4ea0a96c663af09008bd5358066ca3f8520ac 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -11,13 +12,11 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; */ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD private String outputTopic; // NOPMD private Duration windowDuration; // NOPMD - public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc3KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties index 2ceaf37224b0bff54b09beaabe29210216e11671..011406f7ef1e23647eeae150d349f472214cbcd4 100644 --- a/uc3-application/src/main/resources/META-INF/application.properties +++ b/uc3-application/src/main/resources/META-INF/application.properties @@ -7,7 +7,3 @@ kafka.output.topic=output kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8091 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index 23af805733de2bb3f6384fa924a2322490ee58d9..12f35e8dcc532b19e470722094ba5aff07420ad2 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -32,9 +32,8 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { // Use case specific stream configuration - final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); uc4KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) @@ -42,15 +41,7 @@ public class HistoryService { Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); // Configuration of the stream application - final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 7c9e2c4f790cf1fbb7dd34db573576d1e64077db..bbbb043119857612b1a8b0c60e3a5466cd68447e 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -11,14 +12,12 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; */ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD private String outputTopic; // NOPMD private Duration aggregtionDuration; // NOPMD private Duration aggregationAdvance; // NOPMD - public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc4KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties index e577c880a8ff8169699acb8598e323b8671e8d5e..b46681533e63bf86a51439778a46940da348559d 100644 --- a/uc4-application/src/main/resources/META-INF/application.properties +++ b/uc4-application/src/main/resources/META-INF/application.properties @@ -8,7 +8,3 @@ aggregation.duration.days=30 aggregation.advance.days=1 schema.registry.url=http://localhost:8091 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1