diff --git a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java index 7a275cb33a4cd35d228d8ca33ebb7303b251271b..f74ce318713b75cbe5e6da5d523d5811042220f3 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java +++ b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java @@ -5,19 +5,21 @@ package spesb.uc1.application; */ public final class ConfigurationKeys { - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String APPLICATION_NAME = "application.name"; - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String APPLICATION_VERSION = "application.version"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String NUM_THREADS = "num.threads"; - public static final String NUM_THREADS = "num.threads"; + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - private ConfigurationKeys() { - } + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + + private ConfigurationKeys() {} } diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java index 18a39da7229d961249be900eeeff679e267a1eef..438ef510daa9af084ede9f9ef01e8003ad70efb5 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java @@ -31,13 +31,17 @@ public class HistoryService { private void createKafkaStreamsApplication() { final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) + .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java index 4b7f487c8e848f0b1d6d652b7d86a8c50c202af1..a289462a5b6a1f05c347dc66109633046b462c44 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java @@ -10,20 +10,23 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; * Builder for the Kafka Streams configuration. */ public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); + private String applicationName = "uc1-application"; // NOPMD + private String applicationVersion = "0.0.1"; // NOPMD private String bootstrapServers; // NOPMD private String inputTopic; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; + public KafkaStreamsBuilder applicationName(final String applicationName) { + this.applicationName = applicationName; + return this; + } + + public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { + this.applicationVersion = applicationVersion; return this; } @@ -32,6 +35,11 @@ public class KafkaStreamsBuilder { return this; } + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -76,15 +84,13 @@ public class KafkaStreamsBuilder { public KafkaStreams build() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic); + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic); final Properties properties = PropertiesBuilder .bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") .build(); return new KafkaStreams(topologyBuilder.build(), properties); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java index 279b70d0b7311f2b45b986e54cdf5b6c81c28263..1705e3d81a95ff3ac85cef84e32218093296b065 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java @@ -18,8 +18,7 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; - private final Gson gson; - + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); /** @@ -27,14 +26,12 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic) { this.inputTopic = inputTopic; - this.gson = new Gson(); } /** * Build the {@link Topology} for the History microservice. */ public Topology build() { - this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index ef279332f911108fa8ca42d840d4a147460e8e35..8f029be66f9decadc87c8e88f58698d1422d596d 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -1,6 +1,10 @@ +application.name="uc1-application" +application.version="0.0.1" + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1