Skip to content
Snippets Groups Projects
Select Git revision
  • 00fcda9ff1f66a3402eaeee45c509e8e374da839
  • main default protected
  • results-analysis-scripts
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
41 results

Uc1BeamPipeline.java

Blame
  • Uc1BeamPipeline.java 2.29 KiB
    package application;
    
    import java.util.Map;
    import org.apache.beam.sdk.coders.AvroCoder;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.Values;
    import org.apache.commons.configuration2.Configuration;
    import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
    import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
    import theodolite.commons.beam.AbstractPipeline;
    import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
    import titan.ccp.model.records.ActivePowerRecord;
    
    
    /**
     * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
     * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
     * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
     * using--flinkMaster as run parameter. To persist logs add
     * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
     * Input Output in Common in the Run Configuration Start via Eclipse Run.
     */
    public final class Uc1BeamPipeline extends AbstractPipeline {
    
      private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
    
      protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
        super(options, config);
    
        // Set Coders for Classes that will be distributed
        final CoderRegistry cr = this.getCoderRegistry();
        cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
    
        // build KafkaConsumerConfig
        final Map<String, Object> consumerConfig = this.buildConsumerConfig();
    
        // Create Pipeline transformations
        final KafkaActivePowerTimestampReader kafka =
            new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
    
        // Apply pipeline transformations
        // Read from Kafka
        this.apply(kafka)
            .apply(Values.create())
            .apply(MapElements.via(new ConverterAdapter<>(
                this.databaseAdapter.getRecordConverter(),
                String.class)))
            .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
      }
    }