diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index eb894d13b38c46eb63136c2f670dfdf7e091356f..352b32a29ff6cfd5d01a4e74798f79c8d08c769a 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -9,14 +9,8 @@ import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; - /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of benchmark UC1: Database Storage with Apache Beam. */ public final class Uc1BeamPipeline extends AbstractPipeline { @@ -27,19 +21,16 @@ public final class Uc1BeamPipeline extends AbstractPipeline { final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); - // Set Coders for Classes that will be distributed - final CoderRegistry cr = this.getCoderRegistry(); + // Set Coders for classes that will be distributed + final CoderRegistry cr = super.getCoderRegistry(); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - // Create Pipeline transformations final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( - this.bootstrapServer, - this.inputTopic, - this.buildConsumerConfig()); + super.bootstrapServer, + super.inputTopic, + super.buildConsumerConfig()); - // Apply pipeline transformations - // Read from Kafka - this.apply(kafka) + super.apply(kafka) .apply(Values.create()) .apply(sinkType.create(config)); }