Skip to content
Snippets Groups Projects
Commit 7a881fcb authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Extract configuration keys of kafkastream apps to commons project

In order to not add a new configuration key that is needed by
all the kafka stream use case applications extract the configuration
keys to the kafkastreams common project.
parent cb59dae2
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
package theodolite.uc2.application;
package theodolite.commons.kafkastreams;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
// Common keys
public static final String APPLICATION_NAME = "application.name";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
// UC2
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String NUM_THREADS = "num.threads";
// UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
private ConfigurationKeys() {}
......
package theodolite.uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
private ConfigurationKeys() {}
}
......@@ -3,6 +3,7 @@ package theodolite.uc1.application;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
import titan.ccp.common.configuration.ServiceConfigurations;
......
......@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.ServiceConfigurations;
......
package theodolite.uc3.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
private ConfigurationKeys() {
}
}
......@@ -5,6 +5,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations;
......
package theodolite.uc4.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
private ConfigurationKeys() {}
}
......@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment