Skip to content
Snippets Groups Projects
Commit dd0aa284 authored by Sören Henning's avatar Sören Henning
Browse files

Draft for new service and pipeline factory

parent b874591c
No related branches found
No related tags found
1 merge request!249Align package structure among all benchmark implementations
Pipeline #6643 failed
package application;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord;
public abstract class AbstractPipelineFactory {
protected final Configuration configuration;
public AbstractPipelineFactory(final Configuration configuration) {
this.configuration = configuration;
}
public final Pipeline create(final PipelineOptions options) {
final Pipeline pipeline = Pipeline.create(options);
this.constructPipeline(pipeline);
this.registerCoders(pipeline.getCoderRegistry());
return pipeline;
}
private void constructPipeline(Pipeline pipeline) {
// pipeline.apply(kafka)
// .apply(Values.create())
// .apply(sinkType.create(config));
}
private void registerCoders(CoderRegistry registry) {
registry.registerCoderForClass(
ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.SCHEMA$));
}
}
package application;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.beam.ConfigurationKeys;
import titan.ccp.common.configuration.ServiceConfigurations;
public class BeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions;
public BeamService(
AbstractPipelineFactory pipelineFactory,
Class<? extends PipelineRunner<?>> runner,
String[] args) {
this.pipelineFactory = pipelineFactory;
this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create();
this.pipelineOptions.setJobName(this.applicationName);
this.pipelineOptions.setRunner(runner);
}
public void run() {
LOGGER.info("Starting BeamService with pipeline options: {}", this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.config, this.pipelineOptions);
pipeline.run().waitUntilFinish();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment