diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index debfd967ea956f94b939d686b87ce6d29254e449..289de6c30f41efa6162adc8058824fb9872fb211 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -1,55 +1,41 @@ package theodolite.commons.beam; -import java.util.Properties; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** * Abstraction of a beam microservice. + * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. */ public class AbstractBeamService { - // Application Configurations - public static final Configuration CONFIG = ServiceConfigurations.createWithDefaults(); - public static final String APPLICATION_NAME = - CONFIG.getString(ConfigurationKeys.APPLICATION_NAME); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class); // Beam Pipeline protected PipelineOptions options; - public AbstractBeamService(final String[] args) { //NOPMD - options = PipelineOptionsFactory.fromArgs(args).create(); - options.setJobName(APPLICATION_NAME); - } + // Application Configurations + private final Configuration config = ServiceConfigurations.createWithDefaults(); + private final String applicationName = + config.getString(ConfigurationKeys.APPLICATION_NAME); /** - * Abstract main for a Beam Service. + * Creates AbstractBeamService with options. */ - public static void main(final String[] args){} //NOPMD + public AbstractBeamService(final String[] args) { //NOPMD + super(); + options = PipelineOptionsFactory.fromArgs(args).create(); + options.setJobName(applicationName); + LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); + } - /** - * Builds a simple configuration for a Kafka consumer. - * - * @return the build Kafka consumer configuration. - */ - public Properties buildConsumerConfig() { - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - CONFIG - .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); - consumerConfig.put("schema.registry.url", - CONFIG.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); - - consumerConfig.put("specific.avro.reader", - CONFIG.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPLICATION_NAME); - return consumerConfig; + public Configuration getConfig() { + return config; } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java new file mode 100644 index 0000000000000000000000000000000000000000..7588e6abe3fad99761da331105189b41bb17cba0 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -0,0 +1,44 @@ +package theodolite.commons.beam; + +import java.util.Properties; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.commons.configuration2.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +/** + * Abstraction of a Beam {@link Pipeline}. + */ +public class AbstractPipeline extends Pipeline { + + // Application Configurations + private final Configuration config; + + protected AbstractPipeline(final PipelineOptions options, final Configuration config) { + super(options); + this.config = config; + } + + /** + * Builds a simple configuration for a Kafka consumer transformation. + * + * @return the build configuration. + */ + public Properties buildConsumerConfig() { + final Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + config + .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + consumerConfig.put("schema.registry.url", + config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + + consumerConfig.put("specific.avro.reader", + config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + + final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName); + return consumerConfig; + } +} diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 7470f7a45f2ffe666cb17442e72359d1f4133abc..d6db9202ca1c2142aa8379d8fd8fd27f43f80a42 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -5,6 +5,8 @@ include 'kstreams-commons' include 'flink-commons' include 'beam-commons' +include 'uc1-beam' + include 'uc1-load-generator' include 'uc1-kstreams' include 'uc1-flink' diff --git a/theodolite-benchmarks/uc1-beam-flink/build.gradle b/theodolite-benchmarks/uc1-beam-flink/build.gradle index e7ac89dfa82d3473a96231eab128955f68492868..ee92fb9b44e6a380348691019affea93320984c4 100644 --- a/theodolite-benchmarks/uc1-beam-flink/build.gradle +++ b/theodolite-benchmarks/uc1-beam-flink/build.gradle @@ -4,6 +4,7 @@ plugins { dependencies { compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0' + compile project(':uc1-beam') } -mainClassName = "application.Uc1ApplicationBeam" +mainClassName = "application.Uc1BeamFlink" diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java deleted file mode 100644 index b794146c0ae6a1194590fa08562836de1d43c68f..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1ApplicationBeam.java +++ /dev/null @@ -1,115 +0,0 @@ -package application; - -import com.google.gson.Gson; -import java.util.Properties; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import theodolite.commons.beam.AbstractBeamService; -import theodolite.commons.beam.ConfigurationKeys; -import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; -import titan.ccp.model.records.ActivePowerRecord; - - -/** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. - */ -public final class Uc1ApplicationBeam extends AbstractBeamService { - - private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class); - private final String inputTopic = CONFIG.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - private final String bootstrapServer = - CONFIG.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - - /** - * Private constructor setting specific options for this use case. - */ - private Uc1ApplicationBeam(final String[] args) { //NOPMD - super(args); - LOGGER.info(this.options.toString()); - this.options.setRunner(FlinkRunner.class); - } - - /** - * Main method. - */ - @SuppressWarnings({"unchecked", "rawtypes", "unused"}) - public static void main(final String[] args) { - - final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args); - - // create pipeline - final Pipeline pipeline = Pipeline.create(uc1.options); - - // Set Coders for Classes that will be distributed - final CoderRegistry cr = pipeline.getCoderRegistry(); - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - - // build KafkaConsumerConfig - final Properties consumerConfig = uc1.buildConsumerConfig(); - - // Create Pipeline transformations - final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = - new KafkaActivePowerRecordReader(uc1.bootstrapServer, uc1.inputTopic, consumerConfig); - - final LogKeyValue logKeyValue = new LogKeyValue(); - - // Apply pipeline transformations - // Read from Kafka - pipeline.apply(kafka) - // Map to Gson - .apply(MapElements - .via( - new SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>>() { - private transient Gson gsonObj = new Gson(); - - @Override - public KV<String, String> apply( - final KV<String, ActivePowerRecord> kv) { - if (this.gsonObj == null) { - this.gsonObj = new Gson(); - } - final String gson = this.gsonObj.toJson(kv.getValue()); - return KV.of(kv.getKey(), gson); - } - })) - // Print to console - .apply(ParDo.of(logKeyValue)); - // Submit job and start execution - pipeline.run().waitUntilFinish(); - } - - - /** - * Logs all Key Value pairs. - */ - @SuppressWarnings({"unused"}) - private static class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { - private static final long serialVersionUID = 4328743; - - @ProcessElement - public void processElement(@Element final KV<String, String> kv, - final OutputReceiver<KV<String, String>> out) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Key: " + kv.getKey() + "Value: " + kv.getValue()); - } - } - } -} - diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java new file mode 100644 index 0000000000000000000000000000000000000000..62800cccb23769b7cccdf04a90ef9118c4182db6 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java @@ -0,0 +1,41 @@ +package application; + +import org.apache.beam.runners.flink.FlinkRunner; +import theodolite.commons.beam.AbstractBeamService; + +/** + * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To + * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload + * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress + * using--flinkMaster as run parameter. To persist logs add + * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard + * Input Output in Common in the Run Configuration Start via Eclipse Run. + */ +public final class Uc1BeamFlink extends AbstractBeamService { + + /** + * Private constructor setting specific options for this use case. + */ + private Uc1BeamFlink(final String[] args) { //NOPMD + super(args); + this.options.setRunner(FlinkRunner.class); + } + + /** + * Main method. + */ + @SuppressWarnings({"unchecked", "rawtypes", "unused"}) + public static void main(final String[] args) { + + // Create application via configurations + final Uc1BeamFlink uc1 = new Uc1BeamFlink(args); + + // Create pipeline with configurations + Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); + + // Submit job and start execution + pipeline.run().waitUntilFinish(); + } + +} + diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..502e94fa737fb2ae1bab861407b27575cd8766ca --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -0,0 +1,5 @@ +plugins { + id 'theodolite.beam' +} + + diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java new file mode 100644 index 0000000000000000000000000000000000000000..e791d136294eb902e28f03063797901d63e4971f --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java @@ -0,0 +1,24 @@ +package application; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Logs all Key Value pairs. + */ +@SuppressWarnings({"unused"}) +public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { + private static final long serialVersionUID = 4328743; + private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); + + @ProcessElement + public void processElement(@Element final KV<String, String> kv, + final OutputReceiver<KV<String, String>> out) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue()); + } + out.output(kv); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java new file mode 100644 index 0000000000000000000000000000000000000000..c83fe2a3da9473e7750346d6f84fb12fb6432796 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java @@ -0,0 +1,21 @@ +package application; + +import com.google.gson.Gson; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import titan.ccp.model.records.ActivePowerRecord; + +public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> { + private static final long serialVersionUID = 7168356203579050214L; + private transient Gson gsonObj = new Gson(); + + @Override + public KV<String, String> apply( + final KV<String, ActivePowerRecord> kv) { + if (this.gsonObj == null) { + this.gsonObj = new Gson(); + } + final String gson = this.gsonObj.toJson(kv.getValue()); + return KV.of(kv.getKey(), gson); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java new file mode 100644 index 0000000000000000000000000000000000000000..b0f6c8646337b616f3d616998d5c896632fe8767 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -0,0 +1,65 @@ +package application; + +import java.util.Properties; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.commons.beam.AbstractPipeline; +import theodolite.commons.beam.ConfigurationKeys; +import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader; +import titan.ccp.model.records.ActivePowerRecord; + + +/** + * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To + * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload + * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress + * using--flinkMaster as run parameter. To persist logs add + * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard + * Input Output in Common in the Run Configuration Start via Eclipse Run. + */ +public final class Uc1BeamPipeline extends AbstractPipeline { + + private static final Logger LOGGER = LoggerFactory.getLogger(Uc1BeamPipeline.class); + + Uc1BeamPipeline(PipelineOptions options, Configuration config) { + super(options, config); + // Additional needed fields + String inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); + String bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + + // Set Coders for Classes that will be distributed + + final CoderRegistry cr = this.getCoderRegistry(); + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); + + // build KafkaConsumerConfig + final Properties consumerConfig = buildConsumerConfig(); + + // Create Pipeline transformations + final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = + new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig); + + final LogKeyValue logKeyValue = new LogKeyValue(); + final MapToGson mapToGson = new MapToGson(); + + // Apply pipeline transformations + // Read from Kafka + this.apply(kafka) + // Map to Gson + .apply(MapElements + .via(mapToGson)) + // Print to console + .apply(ParDo.of(logKeyValue)); + } +} +