Skip to content
Snippets Groups Projects
Select Git revision
  • 07b877b3a02440754dfecdbf9f79cc6ac7402825
  • main default protected
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • delete-action-for-other-namespace
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
41 results

KafkaStreamsBuilder.java

Blame
  • user avatar
    Björn Vonheiden authored
    Add the spesb prefix to all self implemented java packages of the
    project. Because this changes the fully qualified class name of
    the main classes the build.gradle files need to adopt to this.
    07b877b3
    History
    KafkaStreamsBuilder.java 3.45 KiB
    package spesb.uc1.streamprocessing;
    
    import java.util.Objects;
    import java.util.Properties;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import titan.ccp.common.kafka.streams.PropertiesBuilder;
    
    /**
     * Builder for the Kafka Streams configuration.
     */
    public class KafkaStreamsBuilder {
    
      private static final String APPLICATION_NAME = "titan-ccp-history";
      private static final String APPLICATION_VERSION = "0.0.1";
    
      // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
    
      private String bootstrapServers; // NOPMD
      private String inputTopic; // NOPMD
      private int numThreads = -1; // NOPMD
      private int commitIntervalMs = -1; // NOPMD
      private int cacheMaxBytesBuff = -1; // NOPMD
    
      public KafkaStreamsBuilder inputTopic(final String inputTopic) {
        this.inputTopic = inputTopic;
        return this;
      }
    
      public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
        return this;
      }
    
      /**
       * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
       * one for using the default.
       */
      public KafkaStreamsBuilder numThreads(final int numThreads) {
        if (numThreads < -1 || numThreads == 0) {
          throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
        }
        this.numThreads = numThreads;
        return this;
      }
    
      /**
       * Sets the Kafka Streams property for the frequency with which to save the position (offsets in
       * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
       * example, when processing bulks of records. Can be minus one for using the default.
       */
      public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
        if (commitIntervalMs < -1) {
          throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
        }
        this.commitIntervalMs = commitIntervalMs;
        return this;
      }
    
      /**
       * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
       * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
       * example, when processing bulks of records. Can be minus one for using the default.
       */
      public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
        if (cacheMaxBytesBuffering < -1) {
          throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
        }
        this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
        return this;
      }
    
      /**
       * Builds the {@link KafkaStreams} instance.
       */
      public KafkaStreams build() {
        Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
        // TODO log parameters
        final TopologyBuilder topologyBuilder = new TopologyBuilder(
            this.inputTopic);
        final Properties properties = PropertiesBuilder
            .bootstrapServers(this.bootstrapServers)
            .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
            .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
            .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
            .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
            // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
            .build();
        return new KafkaStreams(topologyBuilder.build(), properties);
      }
    
    }