diff --git a/uc4-application/src/main/java/uc4/application/HistoryService.java b/uc4-application/src/main/java/uc4/application/HistoryService.java index 8e6020b58ac04b0fc75be8231d99d6e3c224b432..46c6aa3cca6df52aa9f770e46566b9c3eb3720ff 100644 --- a/uc4-application/src/main/java/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/uc4/application/HistoryService.java @@ -1,6 +1,5 @@ package uc4.application; -import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; @@ -18,8 +17,8 @@ public class HistoryService { private final Configuration config = Configurations.create(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - final int KAFKA_WINDOW_DURATION_MINUTES = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); + + final String schemaRegistry = Objects.requireNonNull(System.getenv("SCHEMA_REGISTRY_URL")); /** * Start the service. @@ -37,9 +36,9 @@ public class HistoryService { private void createKafkaStreamsApplication() { final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.schemaRegistry) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.KAFKA_WINDOW_DURATION_MINUTES)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java index c69f8bca2d8b068e329dcf81a00345f7e05b0aa0..a5e71ebb3318c1c8a92328b56f1eaae421172873 100644 --- a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java @@ -22,6 +22,7 @@ public class KafkaStreamsBuilder { private String inputTopic; // NOPMD private String outputTopic; // NOPMD private Duration windowDuration; // NOPMD + private String schemaRegistryUrl; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD @@ -36,13 +37,13 @@ public class KafkaStreamsBuilder { return this; } - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; + public KafkaStreamsBuilder schemaRegistry(final String url) { + this.schemaRegistryUrl = url; return this; } - public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; + public KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; return this; } @@ -91,7 +92,7 @@ public class KafkaStreamsBuilder { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); // TODO log parameters final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration, null); + this.windowDuration, this.schemaRegistryUrl); final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)