Skip to content
Snippets Groups Projects
Commit 777ad136 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add generic Reader from Kafka

Just in usecase 4 for now
parent 2e825d40
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam.kafka;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*
* @param <K> Type of the Key.
* @param <V> Type of the Value.
*/
public class KafkaGenericReader<K, V> extends
PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
final Map consumerConfig,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<K>>
keyDeserializer,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
super();
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(keyDeserializer)
.withValueDeserializer(valueDeserializer)
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<K, V>> expand(final PBegin input) {
return input.apply(this.reader);
}
}
......@@ -40,6 +40,7 @@ import serialization.SensorParentKeyCoder;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaGenericReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -84,10 +85,18 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
registerCoders(cr);
// Read from Kafka
// ActivePowerRecords
final KafkaActivePowerTimestampReader
kafkaActivePowerRecordReader =
new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
//Configuration Events
final KafkaGenericReader<Event, String>
kafkaConfigurationReader =
new KafkaGenericReader<>(
bootstrapServer, configurationTopic, configurationConfig,
EventDeserializer.class, StringDeserializer.class);
// Transform into AggregatedActivePowerRecords into ActivePowerRecords
final AggregatedToActive aggregatedToActive = new AggregatedToActive();
......@@ -100,11 +109,9 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
new KafkaWriterTransformation<>(
bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
// Apply pipeline transformations
// Read from Kafka
final PCollection<KV<String, ActivePowerRecord>> values = this
.apply(kafkaActivePowerRecordReader)
.apply("Read from Kafka", kafkaActivePowerRecordReader)
.apply("Read Windows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
......@@ -148,13 +155,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
// Build the configuration stream from a changelog.
final PCollection<KV<String, Set<String>>> configurationStream = this
.apply("Read sensor groups", KafkaIO.<Event, String>read()
.withBootstrapServers(bootstrapServer)
.withTopic(configurationTopic)
.withKeyDeserializer(EventDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(configurationConfig)
.withoutMetadata())
.apply("Read sensor groups", kafkaConfigurationReader)
// Only forward relevant changes in the hierarchy
.apply("Filter changed and status events",
Filter.by(new FilterEvents()))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment