From a89fbc336e3251add83f73fbd956dcf538ddc829 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 24 Feb 2022 15:45:11 +0100 Subject: [PATCH] Fix code quality issues --- .../commons/beam/AbstractBeamService.java | 44 ------------ .../commons/beam/AbstractPipeline.java | 72 ------------------- .../commons/beam/AbstractPipelineFactory.java | 8 +++ .../theodolite/commons/beam/BeamService.java | 21 +++++- .../java/application/PipelineFactory.java | 4 ++ .../java/application/PipelineFactory.java | 7 +- .../java/application/PipelineFactory.java | 7 +- .../main/java/application/Uc4BeamSamza.java | 1 - .../java/application/PipelineFactory.java | 11 ++- 9 files changed, 50 insertions(+), 125 deletions(-) delete mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java delete mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java deleted file mode 100644 index 4e9704dc9..000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ /dev/null @@ -1,44 +0,0 @@ -package theodolite.commons.beam; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.commons.configuration2.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.configuration.ServiceConfigurations; - -/** - * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and - * the beam Runner. - */ -@Deprecated -public class AbstractBeamService { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class); - - // Beam Pipeline - protected PipelineOptions options; - - // Application Configurations - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - - /** - * Creates AbstractBeamService with options. - */ - public AbstractBeamService(final String[] args) { // NOPMD - super(); - LOGGER.info("Pipeline options:"); - for (final String s : args) { - LOGGER.info("{}", s); - } - this.options = PipelineOptionsFactory.fromArgs(args).create(); - this.options.setJobName(this.applicationName); - LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); - } - - public Configuration getConfig() { - return this.config; - } - -} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java deleted file mode 100644 index 939b8e3de..000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ /dev/null @@ -1,72 +0,0 @@ -package theodolite.commons.beam; - -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -/** - * Abstraction of a Beam {@link Pipeline}. - */ -@Deprecated -public class AbstractPipeline extends Pipeline { - - private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD - private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD - - protected final String inputTopic; - protected final String bootstrapServer; - // Application Configurations - private final Configuration config; - - protected AbstractPipeline(final PipelineOptions options, final Configuration config) { - super(options); - this.config = config; - - this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - } - - /** - * Builds a simple configuration for a Kafka consumer transformation. - * - * @return the build configuration. - */ - public Map<String, Object> buildConsumerConfig() { - final Map<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); - consumerConfig.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); - consumerConfig.put( - KAFKA_CONFIG_SCHEMA_REGISTRY_URL, - this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - consumerConfig.put( - KAFKA_CONFIG_SPECIFIC_AVRO_READER, - this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - consumerConfig.put( - ConsumerConfig.GROUP_ID_CONFIG, - this.config.getString(ConfigurationKeys.APPLICATION_NAME)); - return consumerConfig; - } - - /** - * Builds a simple configuration for a Kafka producer transformation. - * - * @return the build configuration. - */ - public Map<String, Object> buildProducerConfig() { - final Map<String, Object> config = new HashMap<>(); - config.put( - KAFKA_CONFIG_SCHEMA_REGISTRY_URL, - this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - config.put( - KAFKA_CONFIG_SPECIFIC_AVRO_READER, - this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - return config; - } -} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java index 2cedeb1ce..78610bd7f 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java @@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; +/** + * Abstract factory class for creating Beam pipelines from a {@link Configuration} and + * {@link PipelineOptions}. Implementations may expand the {@link PipelineOptions}, construct a + * {@link Pipeline} and register coders. + */ public abstract class AbstractPipelineFactory { protected final Configuration config; @@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory { this.config = configuration; } + /** + * Create a Pipeline with the configured {@link PipelineOptions}. + */ public final Pipeline create(final PipelineOptions options) { this.expandOptions(options); final Pipeline pipeline = Pipeline.create(options); diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java index 28f3d481f..62a9b374a 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java @@ -10,6 +10,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; +/** + * A general Apache Beam-based microservice. It is configured by Beam pipeline, a Beam runner and + * additional configuration. + */ public class BeamService { private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class); @@ -20,16 +24,27 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + /** + * Create a new {@link BeamService}. + * + * @param pipelineFactoryFactory {@link Function} for creating an {@link AbstractPipelineFactory} + * based on a {@link Configuration}. + * @param runner The Beam {@link PipelineRunner} to run this pipeline. + * @param args Arguments which are treated as {@link PipelineOptions}. + */ public BeamService( - Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, - Class<? extends PipelineRunner<?>> runner, - String[] args) { + final Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, + final Class<? extends PipelineRunner<?>> runner, + final String... args) { this.pipelineFactory = pipelineFactoryFactory.apply(this.config); this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create(); this.pipelineOptions.setJobName(this.applicationName); this.pipelineOptions.setRunner(runner); } + /** + * Start this microservice, by running the underlying Beam pipeline. + */ public void run() { LOGGER.info("Construct Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java index c01b2b389..d8d7b172d 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java @@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC1. + */ public class PipelineFactory extends AbstractPipelineFactory { public static final String SINK_TYPE_KEY = "sink.type"; @@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void expandOptions(final PipelineOptions options) { + // No options to set // TODO Add for PubSub // final String pubSubEmulatorHost = super.config.getString(null); // if (pubSubEmulatorHost != null) { diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java index 442bc72ce..03ec7446b 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java @@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC2. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override protected void constructPipeline(final Pipeline pipeline) { diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java index afb6464ce..e14010c5e 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java @@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC3. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override protected void constructPipeline(final Pipeline pipeline) { diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 108b85d73..25420958d 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -1,6 +1,5 @@ package application; - import org.apache.beam.runners.samza.SamzaRunner; import theodolite.commons.beam.BeamService; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java index 323779b71..c6ec79a1f 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java @@ -1,4 +1,4 @@ -package application; +package application; // NOPMD import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; @@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC4. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override - protected void constructPipeline(final Pipeline pipeline) { + protected void constructPipeline(final Pipeline pipeline) { // NOPMD // Additional needed variables final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); -- GitLab