Skip to content
Snippets Groups Projects
Commit 6756d19b authored by Sören Henning's avatar Sören Henning
Browse files

Make schema registry url configurable

parent 0bd6372c
No related branches found
No related tags found
No related merge requests found
Pipeline #369 passed
package uc4.application; package uc4.application;
import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
...@@ -18,8 +17,8 @@ public class HistoryService { ...@@ -18,8 +17,8 @@ public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
final int KAFKA_WINDOW_DURATION_MINUTES = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); final String schemaRegistry = Objects.requireNonNull(System.getenv("SCHEMA_REGISTRY_URL"));
/** /**
* Start the service. * Start the service.
...@@ -37,9 +36,9 @@ public class HistoryService { ...@@ -37,9 +36,9 @@ public class HistoryService {
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.schemaRegistry)
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.KAFKA_WINDOW_DURATION_MINUTES))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
...@@ -22,6 +22,7 @@ public class KafkaStreamsBuilder { ...@@ -22,6 +22,7 @@ public class KafkaStreamsBuilder {
private String inputTopic; // NOPMD private String inputTopic; // NOPMD
private String outputTopic; // NOPMD private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD private Duration windowDuration; // NOPMD
private String schemaRegistryUrl; // NOPMD
private int numThreads = -1; // NOPMD private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD
...@@ -36,13 +37,13 @@ public class KafkaStreamsBuilder { ...@@ -36,13 +37,13 @@ public class KafkaStreamsBuilder {
return this; return this;
} }
public KafkaStreamsBuilder outputTopic(final String outputTopic) { public KafkaStreamsBuilder schemaRegistry(final String url) {
this.outputTopic = outputTopic; this.schemaRegistryUrl = url;
return this; return this;
} }
public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.windowDuration = windowDuration; this.outputTopic = outputTopic;
return this; return this;
} }
...@@ -91,7 +92,7 @@ public class KafkaStreamsBuilder { ...@@ -91,7 +92,7 @@ public class KafkaStreamsBuilder {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters // TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration, null); this.windowDuration, this.schemaRegistryUrl);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment