Skip to content
Snippets Groups Projects
Commit bbe7169c authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix uc4-beam

refactor + use Generalized KafkaReader and writer
parent 70492a50
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam;
import java.util.HashMap;
import java.util.Properties;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration;
......@@ -31,8 +30,8 @@ public class AbstractPipeline extends Pipeline {
*
* @return the build configuration.
*/
public HashMap buildConsumerConfig() {
final HashMap consumerConfig = new HashMap();
public HashMap<String, Object> buildConsumerConfig() {
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
......
......@@ -13,7 +13,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
* where the value type can be generic.
* @param <T> type of the value.
*/
public class KafkaWriterTransformation<T extends Serializer> extends
public class KafkaWriterTransformation<T> extends
PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L;
......
package application;
import com.google.common.math.StatsAccumulator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.*;
......@@ -18,19 +16,16 @@ import org.apache.beam.sdk.values.*;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import serialization.*;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
import theodolite.commons.beam.kafka.KafkaGenericReader;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
......@@ -62,7 +57,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
final Duration gracePeriod = Duration.standardSeconds(grace);
// Build kafka configuration
final HashMap consumerConfig = buildConsumerConfig();
final HashMap<String, Object> consumerConfig = buildConsumerConfig();
final HashMap<String, Object> configurationConfig = configurationConfig(config);
// Set Coders for Classes that will be distributed
......@@ -70,36 +65,35 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
registerCoders(cr);
// Read from Kafka
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
final KafkaActivePowerTimestampReader
kafkaActivePowerRecordReader =
new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
// final PTransform<PBegin, PCollection<KV<String, AggregatedActivePowerRecord>>>
// kafkaAggregatedPowerRecordReader =
// new KafkaGenericReader<String, AggregatedActivePowerRecord>
// (bootstrapServer, feedbackTopic, configurationConfig, StringDeserializer.class,
// (Class<KafkaAvroDeserializer>) KafkaAvroDeserializer.class);
new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
// Transform into AggregatedActivePowerRecords into ActivePowerRecords
final AggregatedToActive aggregatedToActive = new AggregatedToActive();
// Write to Kafka
final KafkaWriterTransformation kafkaWriter =
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput =
new KafkaWriterTransformation<>(
bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class);
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback =
new KafkaWriterTransformation<>(
bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
// Apply pipeline transformations
// Read from Kafka
final PCollection<KV<String, ActivePowerRecord>> values = this
.apply(kafkaActivePowerRecordReader)
.apply("Read Windows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
.apply("Read Windows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
// Read the results of earlier aggregations.
final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = this
......@@ -179,8 +173,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.apply("Filter out null values",
Filter.by(filterNullValues));
SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
final SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
final SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
// Aggregate for every sensor group of the current level
final PCollection<KV<String, AggregatedActivePowerRecord>>
......@@ -200,19 +194,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.apply("Set the Identifier in AggregatedActivePowerRecord",
MapElements.via(setIdForAggregated));
aggregations.apply("Write to aggregation results",
KafkaIO.<String, AggregatedActivePowerRecord>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(AggregatedActivePowerRecordSerializer.class));
aggregations.apply("Write to aggregation results", kafkaOutput);
aggregations
.apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
.withBootstrapServers(bootstrapServer)
.withTopic(feedbackTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(AggregatedActivePowerRecordSerializer.class));
.apply("Write to feedback topic", kafkaFeedback);
}
......@@ -222,7 +207,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
*
* @return the build configuration.
*/
public HashMap<String, Object> configurationConfig(Configuration config) {
public HashMap<String, Object> configurationConfig(final Configuration config) {
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
......@@ -230,7 +215,6 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
// final String applicationName = config.getString( ConfigurationKeys.APPLICATION_NAME) + "-configuration";
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
return consumerConfig;
......@@ -244,7 +228,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
*/
private static void registerCoders(final CoderRegistry cr) {
cr.registerCoderForClass(ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.class));
AvroCoder.of(ActivePowerRecord.class));
cr.registerCoderForClass(AggregatedActivePowerRecord.class,
new AggregatedActivePowerRecordCoder());
cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment