From 7a881fcb95cf8395eb909677abfeec3e99793b5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Sat, 30 May 2020 10:12:08 +0200 Subject: [PATCH] Extract configuration keys of kafkastream apps to commons project In order to not add a new configuration key that is needed by all the kafka stream use case applications extract the configuration keys to the kafkastreams common project. --- .../kafkastreams}/ConfigurationKeys.java | 28 +++++++++++++------ .../uc1/application/ConfigurationKeys.java | 27 ------------------ .../uc1/application/HistoryService.java | 1 + .../uc2/application/AggregationService.java | 1 + .../uc3/application/ConfigurationKeys.java | 25 ----------------- .../uc3/application/HistoryService.java | 1 + .../uc4/application/ConfigurationKeys.java | 28 ------------------- .../uc4/application/HistoryService.java | 1 + 8 files changed, 24 insertions(+), 88 deletions(-) rename {uc2-application/src/main/java/theodolite/uc2/application => application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams}/ConfigurationKeys.java (67%) delete mode 100644 uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java delete mode 100644 uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java delete mode 100644 uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java diff --git a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java similarity index 67% rename from uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java rename to application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index 5bfb3dc7c..af05c51bd 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -1,29 +1,41 @@ -package theodolite.uc2.application; +package theodolite.commons.kafkastreams; /** * Keys to access configuration parameters. */ public final class ConfigurationKeys { + // Common keys + public static final String APPLICATION_NAME = "application.name"; - public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + public static final String APPLICATION_VERSION = "application.version"; + + public static final String NUM_THREADS = "num.threads"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + // Additional topics + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + + // UC2 public static final String WINDOW_SIZE_MS = "window.size.ms"; public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String NUM_THREADS = "num.threads"; + // UC4 + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; private ConfigurationKeys() {} diff --git a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java deleted file mode 100644 index ec2e023e1..000000000 --- a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ /dev/null @@ -1,27 +0,0 @@ -package theodolite.uc1.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - - private ConfigurationKeys() {} - -} diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index 8f54267e8..a35cc37b3 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -3,6 +3,7 @@ package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index 74ab2c217..1be3fb8da 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; diff --git a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java deleted file mode 100644 index d95d245e0..000000000 --- a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ /dev/null @@ -1,25 +0,0 @@ -package theodolite.uc3.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - - private ConfigurationKeys() { - } - -} diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 916d64f01..db7ec3ba6 100644 --- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -5,6 +5,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; diff --git a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java deleted file mode 100644 index aa74e1552..000000000 --- a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ /dev/null @@ -1,28 +0,0 @@ -package theodolite.uc4.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - - private ConfigurationKeys() {} - -} diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index 4d686d8f7..a27ea6711 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; -- GitLab