Skip to content
Snippets Groups Projects
Commit a89fbc33 authored by Sören Henning's avatar Sören Henning
Browse files

Fix code quality issues

parent 315aed82
No related branches found
No related tags found
1 merge request!249Align package structure among all benchmark implementations
Pipeline #6766 passed
Showing
with 50 additions and 125 deletions
package theodolite.commons.beam;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* the beam Runner.
*/
@Deprecated
public class AbstractBeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class);
// Beam Pipeline
protected PipelineOptions options;
// Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
/**
* Creates AbstractBeamService with options.
*/
public AbstractBeamService(final String[] args) { // NOPMD
super();
LOGGER.info("Pipeline options:");
for (final String s : args) {
LOGGER.info("{}", s);
}
this.options = PipelineOptionsFactory.fromArgs(args).create();
this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
}
public Configuration getConfig() {
return this.config;
}
}
package theodolite.commons.beam;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
/**
* Abstraction of a Beam {@link Pipeline}.
*/
@Deprecated
public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
private final Configuration config;
protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
super(options);
this.config = config;
this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
}
/**
* Builds a simple configuration for a Kafka consumer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig;
}
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
}
...@@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration; ...@@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
/**
* Abstract factory class for creating Beam pipelines from a {@link Configuration} and
* {@link PipelineOptions}. Implementations may expand the {@link PipelineOptions}, construct a
* {@link Pipeline} and register coders.
*/
public abstract class AbstractPipelineFactory { public abstract class AbstractPipelineFactory {
protected final Configuration config; protected final Configuration config;
...@@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory { ...@@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory {
this.config = configuration; this.config = configuration;
} }
/**
* Create a Pipeline with the configured {@link PipelineOptions}.
*/
public final Pipeline create(final PipelineOptions options) { public final Pipeline create(final PipelineOptions options) {
this.expandOptions(options); this.expandOptions(options);
final Pipeline pipeline = Pipeline.create(options); final Pipeline pipeline = Pipeline.create(options);
......
...@@ -10,6 +10,10 @@ import org.slf4j.Logger; ...@@ -10,6 +10,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
/**
* A general Apache Beam-based microservice. It is configured by Beam pipeline, a Beam runner and
* additional configuration.
*/
public class BeamService { public class BeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class); private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class);
...@@ -20,16 +24,27 @@ public class BeamService { ...@@ -20,16 +24,27 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory; private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions; private final PipelineOptions pipelineOptions;
/**
* Create a new {@link BeamService}.
*
* @param pipelineFactoryFactory {@link Function} for creating an {@link AbstractPipelineFactory}
* based on a {@link Configuration}.
* @param runner The Beam {@link PipelineRunner} to run this pipeline.
* @param args Arguments which are treated as {@link PipelineOptions}.
*/
public BeamService( public BeamService(
Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, final Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory,
Class<? extends PipelineRunner<?>> runner, final Class<? extends PipelineRunner<?>> runner,
String[] args) { final String... args) {
this.pipelineFactory = pipelineFactoryFactory.apply(this.config); this.pipelineFactory = pipelineFactoryFactory.apply(this.config);
this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create(); this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create();
this.pipelineOptions.setJobName(this.applicationName); this.pipelineOptions.setJobName(this.applicationName);
this.pipelineOptions.setRunner(runner); this.pipelineOptions.setRunner(runner);
} }
/**
* Start this microservice, by running the underlying Beam pipeline.
*/
public void run() { public void run() {
LOGGER.info("Construct Beam pipeline with pipeline options: {}", LOGGER.info("Construct Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString()); this.pipelineOptions.toString());
......
...@@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory; ...@@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC1.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public static final String SINK_TYPE_KEY = "sink.type"; public static final String SINK_TYPE_KEY = "sink.type";
...@@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override @Override
protected void expandOptions(final PipelineOptions options) { protected void expandOptions(final PipelineOptions options) {
// No options to set
// TODO Add for PubSub // TODO Add for PubSub
// final String pubSubEmulatorHost = super.config.getString(null); // final String pubSubEmulatorHost = super.config.getString(null);
// if (pubSubEmulatorHost != null) { // if (pubSubEmulatorHost != null) {
......
...@@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; ...@@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation; import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC2.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
......
...@@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; ...@@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation; import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC3.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
......
package application; package application;
import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.runners.samza.SamzaRunner;
import theodolite.commons.beam.BeamService; import theodolite.commons.beam.BeamService;
......
package application; package application; // NOPMD
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
...@@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event; ...@@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* {@link AbstractPipelineFactory} for UC4.
*/
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory {
} }
@Override @Override
protected void expandOptions(final PipelineOptions options) {} protected void expandOptions(final PipelineOptions options) {
// No options to set
}
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) { // NOPMD
// Additional needed variables // Additional needed variables
final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment