Skip to content
Snippets Groups Projects
Commit 40ccbe7b authored by Sören Henning's avatar Sören Henning
Browse files

Merge master

parents 8d69ba7a cac915db
No related branches found
No related tags found
No related merge requests found
Showing
with 2 additions and 13 deletions
File moved
File moved
...@@ -36,26 +36,15 @@ public class AggregationService { ...@@ -36,26 +36,15 @@ public class AggregationService {
* @param clusterSession the database session which the application should use. * @param clusterSession the database session which the application should use.
*/ */
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
// Use case specific stream configuration final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder();
uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC))
.emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)))
.gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)));
// Configuration of the stream application final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start(); kafkaStreams.start();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment