Skip to content
Snippets Groups Projects
Commit 86d4732c authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add ussage of common Kafka-Reader in uc1-beam-flink

parent 66ae4c77
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -2,6 +2,8 @@ package theodolite.commons.beam.kafka; ...@@ -2,6 +2,8 @@ package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
...@@ -25,7 +27,7 @@ public class KafkaAggregatedPowerRecordReader extends ...@@ -25,7 +27,7 @@ public class KafkaAggregatedPowerRecordReader extends
*/ */
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic, public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic,
Map<Object, Object> consumerConfig) { Properties consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
......
...@@ -21,6 +21,7 @@ import org.slf4j.Logger; ...@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.AbstractBeamService;
import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaAggregatedPowerRecordReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -68,14 +69,7 @@ public final class Uc1ApplicationBeam extends AbstractBeamService { ...@@ -68,14 +69,7 @@ public final class Uc1ApplicationBeam extends AbstractBeamService {
// Create Pipeline transformations // Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read() new KafkaAggregatedPowerRecordReader(uc1.bootstrapServer, uc1.inputTopic, consumerConfig);
.withBootstrapServers(uc1.bootstrapServer)
.withTopic(uc1.inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
final LogKeyValue logKeyValue = new LogKeyValue(); final LogKeyValue logKeyValue = new LogKeyValue();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment