Skip to content
Snippets Groups Projects

Migrate Beam benchmark implementation

Compare and Show latest version
7 files
+ 97
51
Compare changes
  • Side-by-side
  • Inline
Files
7
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
@@ -14,9 +14,10 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaAggregatedPowerRecordReader extends
public class KafkaActivePowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
@@ -24,8 +25,8 @@ public class KafkaAggregatedPowerRecordReader extends
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic,
Map<Object, Object> consumerConfig) {
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Properties consumerConfig) {
super();
// Check if boostrap server and inputTopic are defined
@@ -45,7 +46,7 @@ public class KafkaAggregatedPowerRecordReader extends
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) {
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
Loading