From 29f14d89023e1459d40b5d77ab477cae83476dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Thu, 14 May 2020 15:03:29 +0200 Subject: [PATCH] Refactor uc1 kafka streams application - add application name and version as a property - checkstyle cleanups --- .../uc1/application/ConfigurationKeys.java | 18 +++++++------ .../spesb/uc1/application/HistoryService.java | 8 ++++-- .../streamprocessing/KafkaStreamsBuilder.java | 26 ++++++++++++------- .../uc1/streamprocessing/TopologyBuilder.java | 5 +--- .../resources/META-INF/application.properties | 4 +++ 5 files changed, 37 insertions(+), 24 deletions(-) diff --git a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java index 7a275cb33..f74ce3187 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java +++ b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java @@ -5,19 +5,21 @@ package spesb.uc1.application; */ public final class ConfigurationKeys { - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String APPLICATION_NAME = "application.name"; - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String APPLICATION_VERSION = "application.version"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String NUM_THREADS = "num.threads"; - public static final String NUM_THREADS = "num.threads"; + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - private ConfigurationKeys() { - } + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + + private ConfigurationKeys() {} } diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java index 18a39da72..438ef510d 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java @@ -31,13 +31,17 @@ public class HistoryService { private void createKafkaStreamsApplication() { final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) + .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java index 4b7f487c8..a289462a5 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java @@ -10,20 +10,23 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; * Builder for the Kafka Streams configuration. */ public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); + private String applicationName = "uc1-application"; // NOPMD + private String applicationVersion = "0.0.1"; // NOPMD private String bootstrapServers; // NOPMD private String inputTopic; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; + public KafkaStreamsBuilder applicationName(final String applicationName) { + this.applicationName = applicationName; + return this; + } + + public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { + this.applicationVersion = applicationVersion; return this; } @@ -32,6 +35,11 @@ public class KafkaStreamsBuilder { return this; } + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -76,15 +84,13 @@ public class KafkaStreamsBuilder { public KafkaStreams build() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic); + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic); final Properties properties = PropertiesBuilder .bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") .build(); return new KafkaStreams(topologyBuilder.build(), properties); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java index 279b70d0b..1705e3d81 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java @@ -18,8 +18,7 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; - private final Gson gson; - + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); /** @@ -27,14 +26,12 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic) { this.inputTopic = inputTopic; - this.gson = new Gson(); } /** * Build the {@link Topology} for the History microservice. */ public Topology build() { - this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index ef279332f..8f029be66 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -1,6 +1,10 @@ +application.name="uc1-application" +application.version="0.0.1" + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 -- GitLab