Skip to content
Snippets Groups Projects
Select Git revision
  • dd0aa2843ba6922739100d97de5e84794356551b
  • main default protected
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • delete-action-for-other-namespace
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
41 results

AbstractPipelineFactory.java

Blame
  • AbstractPipelineFactory.java 1.08 KiB
    package application;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.coders.AvroCoder;
    import org.apache.beam.sdk.coders.CoderRegistry;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.commons.configuration2.Configuration;
    import titan.ccp.model.records.ActivePowerRecord;
    
    public abstract class AbstractPipelineFactory {
    
      protected final Configuration configuration;
    
      public AbstractPipelineFactory(final Configuration configuration) {
        this.configuration = configuration;
      }
    
      public final Pipeline create(final PipelineOptions options) {
        final Pipeline pipeline = Pipeline.create(options);
        this.constructPipeline(pipeline);
        this.registerCoders(pipeline.getCoderRegistry());
        return pipeline;
      }
    
      private void constructPipeline(Pipeline pipeline) {
        // pipeline.apply(kafka)
        // .apply(Values.create())
        // .apply(sinkType.create(config));
      }
    
      private void registerCoders(CoderRegistry registry) {
        registry.registerCoderForClass(
            ActivePowerRecord.class,
            AvroCoder.of(ActivePowerRecord.SCHEMA$));
      }
    
    }