Skip to content
Snippets Groups Projects
Commit 29f14d89 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Refactor uc1 kafka streams application

- add application name and version as a property
- checkstyle cleanups
parent e8e74457
No related branches found
No related tags found
1 merge request!4Feature/cleanup kafka streams
......@@ -5,19 +5,21 @@ package spesb.uc1.application;
*/
public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String APPLICATION_NAME = "application.name";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String NUM_THREADS = "num.threads";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
private ConfigurationKeys() {
}
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
private ConfigurationKeys() {}
}
......@@ -31,13 +31,17 @@ public class HistoryService {
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
......
......@@ -10,20 +10,23 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder;
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String applicationName = "uc1-application"; // NOPMD
private String applicationVersion = "0.0.1"; // NOPMD
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
public KafkaStreamsBuilder applicationName(final String applicationName) {
this.applicationName = applicationName;
return this;
}
public KafkaStreamsBuilder applicationVersion(final String applicationVersion) {
this.applicationVersion = applicationVersion;
return this;
}
......@@ -32,6 +35,11 @@ public class KafkaStreamsBuilder {
return this;
}
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
......@@ -76,15 +84,13 @@ public class KafkaStreamsBuilder {
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic);
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic);
final Properties properties = PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.applicationId(this.applicationName + '-' + this.applicationVersion)
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
// .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
......
......@@ -18,8 +18,7 @@ public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final Gson gson;
private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder();
/**
......@@ -27,14 +26,12 @@ public class TopologyBuilder {
*/
public TopologyBuilder(final String inputTopic) {
this.inputTopic = inputTopic;
this.gson = new Gson();
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
......
application.name="uc1-application"
application.version="0.0.1"
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
num.threads=1
commit.interval.ms=100
cache.max.bytes.buffering=-1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment