Skip to content
Snippets Groups Projects
Commit 9cfffc58 authored by Sören Henning's avatar Sören Henning
Browse files

Fix code quality issues

parent 66d0d2e3
Branches
Tags
2 merge requests!250Refactor Beam service,!249Align package structure among all benchmark implementations
Pipeline #6835 passed
Showing
with 50 additions and 125 deletions
package theodolite.commons.beam;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* the beam Runner.
*/
@Deprecated
public class AbstractBeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class);
// Beam Pipeline
protected PipelineOptions options;
// Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
/**
* Creates AbstractBeamService with options.
*/
public AbstractBeamService(final String[] args) { // NOPMD
super();
LOGGER.info("Pipeline options:");
for (final String s : args) {
LOGGER.info("{}", s);
}
this.options = PipelineOptionsFactory.fromArgs(args).create();
this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
}
public Configuration getConfig() {
return this.config;
}
}
package theodolite.commons.beam;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
/**
* Abstraction of a Beam {@link Pipeline}.
*/
@Deprecated
public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
private final Configuration config;
protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
super(options);
this.config = config;
this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
}
/**
* Builds a simple configuration for a Kafka consumer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig;
}
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
}
...@@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration; ...@@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
/**
* Abstract factory class for creating Beam pipelines from a {@link Configuration} and
* {@link PipelineOptions}. Implementations may expand the {@link PipelineOptions}, construct a
* {@link Pipeline} and register coders.
*/
public abstract class AbstractPipelineFactory { public abstract class AbstractPipelineFactory {
protected final Configuration config; protected final Configuration config;
...@@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory { ...@@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory {
this.config = configuration; this.config = configuration;
} }
/**
* Create a Pipeline with the configured {@link PipelineOptions}.
*/
public final Pipeline create(final PipelineOptions options) { public final Pipeline create(final PipelineOptions options) {
this.expandOptions(options); this.expandOptions(options);
final Pipeline pipeline = Pipeline.create(options); final Pipeline pipeline = Pipeline.create(options);
......
...@@ -10,6 +10,10 @@ import org.slf4j.Logger; ...@@ -10,6 +10,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
/**
* A general Apache Beam-based microservice. It is configured by Beam pipeline, a Beam runner and
* additional configuration.
*/
public class BeamService { public class BeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class); private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class);
...@@ -20,16 +24,27 @@ public class BeamService { ...@@ -20,16 +24,27 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory; private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions; private final PipelineOptions pipelineOptions;
/**
* Create a new {@link BeamService}.
*
* @param pipelineFactoryFactory {@link Function} for creating an {@link AbstractPipelineFactory}
* based on a {@link Configuration}.
* @param runner The Beam {@link PipelineRunner} to run this pipeline.
* @param args Arguments which are treated as {@link PipelineOptions}.
*/
public BeamService( public BeamService(
Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, final Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory,
Class<? extends PipelineRunner<?>> runner, final Class<? extends PipelineRunner<?>> runner,
String[] args) { final String... args) {
this.pipelineFactory = pipelineFactoryFactory.apply(this.config); this.pipelineFactory = pipelineFactoryFactory.apply(this.config);
this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create(); this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create();
this.pipelineOptions.setJobName(this.applicationName); this.pipelineOptions.setJobName(this.applicationName);
this.pipelineOptions.setRunner(runner); this.pipelineOptions.setRunner(runner);
} }
/**
* Start this microservice, by running the underlying Beam pipeline.
*/
public void run() { public void run() {
LOGGER.info("Construct Beam pipeline with pipeline options: {}", LOGGER.info("Construct Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString()); this.pipelineOptions.toString());
......
...@@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory; ...@@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC1.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public static final String SINK_TYPE_KEY = "sink.type"; public static final String SINK_TYPE_KEY = "sink.type";
...@@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override @Override
protected void expandOptions(final PipelineOptions options) { protected void expandOptions(final PipelineOptions options) {
// No options to set
// TODO Add for PubSub // TODO Add for PubSub
// final String pubSubEmulatorHost = super.config.getString(null); // final String pubSubEmulatorHost = super.config.getString(null);
// if (pubSubEmulatorHost != null) { // if (pubSubEmulatorHost != null) {
......
...@@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; ...@@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation; import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC2.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
......
...@@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; ...@@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation; import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC3.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
......
package application; package application;
import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.runners.samza.SamzaRunner;
import theodolite.commons.beam.BeamService; import theodolite.commons.beam.BeamService;
......
package application; package application; // NOPMD
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
...@@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event; ...@@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC4.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) { // NOPMD
// Additional needed variables // Additional needed variables
final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment