Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • delete-action-for-other-namespace
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
40 results

HistoryService.java

Blame
  • To find the state of this project's repository at the time of any of these versions, check out the tags.
    HistoryService.java 1.81 KiB
    package uc4.application;
    
    import java.time.Duration;
    import java.util.concurrent.CompletableFuture;
    import org.apache.commons.configuration2.Configuration;
    import org.apache.kafka.streams.KafkaStreams;
    import titan.ccp.common.configuration.Configurations;
    import uc4.streamprocessing.KafkaStreamsBuilder;
    
    /**
     * A microservice that manages the history and, therefore, stores and aggregates incoming
     * measurements.
     *
     */
    public class HistoryService {
    
      private final Configuration config = Configurations.create();
    
      private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
    
      /**
       * Start the service.
       */
      public void run() {
        this.createKafkaStreamsApplication();
      }
    
      /**
       * Build and start the underlying Kafka Streams application of the service.
       *
       */
      private void createKafkaStreamsApplication() {
        final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
            .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
            .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
            .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
            .aggregtionDuration(
                Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
            .aggregationAdvance(
                Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)))
            .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
            .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
            .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
            .build();
        this.stopEvent.thenRun(kafkaStreams::close);
        kafkaStreams.start();
      }
    
      public static void main(final String[] args) {
        new HistoryService().run();
      }
    
    }