From 55e159dbcc51b81a9db8932e1f83d543857a73a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Mon, 30 Nov 2020 13:14:34 +0100 Subject: [PATCH] Add configuration for processing guarantee --- .../theodolite/commons/kafkastreams/KafkaStreamsBuilder.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index 0bddef992..ef1ece354 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -91,6 +91,9 @@ public abstract class KafkaStreamsBuilder { this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG, this.config::getLong, p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p) + || StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p)); this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG, this.config::getInt, p -> p >= 0); -- GitLab