Skip to content
Snippets Groups Projects
Commit b874591c authored by Sören Henning's avatar Sören Henning
Browse files

Minor code cleanup

parent 98f8e1c8
No related branches found
No related tags found
1 merge request!245Firestore sink for UC1 Beam
Pipeline #6642 passed
...@@ -9,14 +9,8 @@ import theodolite.commons.beam.AbstractPipeline; ...@@ -9,14 +9,8 @@ import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * Implementation of benchmark UC1: Database Storage with Apache Beam.
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
* generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/ */
public final class Uc1BeamPipeline extends AbstractPipeline { public final class Uc1BeamPipeline extends AbstractPipeline {
...@@ -27,19 +21,16 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -27,19 +21,16 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY));
// Set Coders for Classes that will be distributed // Set Coders for classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = super.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// Create Pipeline transformations
final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(
this.bootstrapServer, super.bootstrapServer,
this.inputTopic, super.inputTopic,
this.buildConsumerConfig()); super.buildConsumerConfig());
// Apply pipeline transformations super.apply(kafka)
// Read from Kafka
this.apply(kafka)
.apply(Values.create()) .apply(Values.create())
.apply(sinkType.create(config)); .apply(sinkType.create(config));
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment