diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java deleted file mode 100644 index 4e9704dc9b08b3c000c60799e73802ca44ac7699..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ /dev/null @@ -1,44 +0,0 @@ -package theodolite.commons.beam; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.commons.configuration2.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.configuration.ServiceConfigurations; - -/** - * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and - * the beam Runner. - */ -@Deprecated -public class AbstractBeamService { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class); - - // Beam Pipeline - protected PipelineOptions options; - - // Application Configurations - private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); - - /** - * Creates AbstractBeamService with options. - */ - public AbstractBeamService(final String[] args) { // NOPMD - super(); - LOGGER.info("Pipeline options:"); - for (final String s : args) { - LOGGER.info("{}", s); - } - this.options = PipelineOptionsFactory.fromArgs(args).create(); - this.options.setJobName(this.applicationName); - LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); - } - - public Configuration getConfig() { - return this.config; - } - -} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java deleted file mode 100644 index 939b8e3de62fea445078359523e0fe127a2346e1..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ /dev/null @@ -1,72 +0,0 @@ -package theodolite.commons.beam; - -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -/** - * Abstraction of a Beam {@link Pipeline}. - */ -@Deprecated -public class AbstractPipeline extends Pipeline { - - private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD - private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD - - protected final String inputTopic; - protected final String bootstrapServer; - // Application Configurations - private final Configuration config; - - protected AbstractPipeline(final PipelineOptions options, final Configuration config) { - super(options); - this.config = config; - - this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - } - - /** - * Builds a simple configuration for a Kafka consumer transformation. - * - * @return the build configuration. - */ - public Map<String, Object> buildConsumerConfig() { - final Map<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); - consumerConfig.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); - consumerConfig.put( - KAFKA_CONFIG_SCHEMA_REGISTRY_URL, - this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - consumerConfig.put( - KAFKA_CONFIG_SPECIFIC_AVRO_READER, - this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - consumerConfig.put( - ConsumerConfig.GROUP_ID_CONFIG, - this.config.getString(ConfigurationKeys.APPLICATION_NAME)); - return consumerConfig; - } - - /** - * Builds a simple configuration for a Kafka producer transformation. - * - * @return the build configuration. - */ - public Map<String, Object> buildProducerConfig() { - final Map<String, Object> config = new HashMap<>(); - config.put( - KAFKA_CONFIG_SCHEMA_REGISTRY_URL, - this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - config.put( - KAFKA_CONFIG_SPECIFIC_AVRO_READER, - this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - return config; - } -} diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java index 2cedeb1ce3134fd8dfcd614ba2b47605e27d31f8..78610bd7fd874c5df309e5bf3e1cee8cb679c004 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java @@ -11,6 +11,11 @@ import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; +/** + * Abstract factory class for creating Beam pipelines from a {@link Configuration} and + * {@link PipelineOptions}. Implementations may expand the {@link PipelineOptions}, construct a + * {@link Pipeline} and register coders. + */ public abstract class AbstractPipelineFactory { protected final Configuration config; @@ -19,6 +24,9 @@ public abstract class AbstractPipelineFactory { this.config = configuration; } + /** + * Create a Pipeline with the configured {@link PipelineOptions}. + */ public final Pipeline create(final PipelineOptions options) { this.expandOptions(options); final Pipeline pipeline = Pipeline.create(options); diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java index 28f3d481f384a1fd3d8cc3873033fe80f6b04d20..62a9b374afbfede283a01edc8cbb5c705d076191 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java @@ -10,6 +10,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; +/** + * A general Apache Beam-based microservice. It is configured by Beam pipeline, a Beam runner and + * additional configuration. + */ public class BeamService { private static final Logger LOGGER = LoggerFactory.getLogger(BeamService.class); @@ -20,16 +24,27 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + /** + * Create a new {@link BeamService}. + * + * @param pipelineFactoryFactory {@link Function} for creating an {@link AbstractPipelineFactory} + * based on a {@link Configuration}. + * @param runner The Beam {@link PipelineRunner} to run this pipeline. + * @param args Arguments which are treated as {@link PipelineOptions}. + */ public BeamService( - Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, - Class<? extends PipelineRunner<?>> runner, - String[] args) { + final Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, + final Class<? extends PipelineRunner<?>> runner, + final String... args) { this.pipelineFactory = pipelineFactoryFactory.apply(this.config); this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create(); this.pipelineOptions.setJobName(this.applicationName); this.pipelineOptions.setRunner(runner); } + /** + * Start this microservice, by running the underlying Beam pipeline. + */ public void run() { LOGGER.info("Construct Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java index c01b2b3892cefe23264af8a5ecc29fe5927e88f4..d8d7b172dbb7a132886784ae5b1f0682971b98b5 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java @@ -11,6 +11,9 @@ import theodolite.commons.beam.AbstractPipelineFactory; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC1. + */ public class PipelineFactory extends AbstractPipelineFactory { public static final String SINK_TYPE_KEY = "sink.type"; @@ -21,6 +24,7 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void expandOptions(final PipelineOptions options) { + // No options to set // TODO Add for PubSub // final String pubSubEmulatorHost = super.config.getString(null); // if (pubSubEmulatorHost != null) { diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java index 442bc72ceb7a3241becdb75e6ca6dd9ca27bfd8f..03ec7446be0351dc0939434788de4feceed89661 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java @@ -24,6 +24,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC2. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -31,7 +34,9 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override protected void constructPipeline(final Pipeline pipeline) { diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java index afb6464ce1d4ad2545a3508640051ea31c93c04f..e14010c5e70a058831e5fd5fcfe0bdb042b13c48 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java @@ -25,6 +25,9 @@ import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC3. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -32,7 +35,9 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override protected void constructPipeline(final Pipeline pipeline) { diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 108b85d7376e956607cf265f4c78ce1d2fe2fc8d..25420958d327370138c668ba66fdd9840055daf0 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -1,6 +1,5 @@ package application; - import org.apache.beam.runners.samza.SamzaRunner; import theodolite.commons.beam.BeamService; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java index 323779b71075595d854024a4b62421e94056c778..c6ec79a1f0c1d03bdb19ce23e6b0dba3f83653dc 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java @@ -1,4 +1,4 @@ -package application; +package application; // NOPMD import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; @@ -50,6 +50,9 @@ import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * {@link AbstractPipelineFactory} for UC4. + */ public class PipelineFactory extends AbstractPipelineFactory { public PipelineFactory(final Configuration configuration) { @@ -57,10 +60,12 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void expandOptions(final PipelineOptions options) {} + protected void expandOptions(final PipelineOptions options) { + // No options to set + } @Override - protected void constructPipeline(final Pipeline pipeline) { + protected void constructPipeline(final Pipeline pipeline) { // NOPMD // Additional needed variables final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);