Skip to content
Snippets Groups Projects
Commit 363504ab authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add uc3-beam-flink with abstract Service + Pipeline

parent 6cccc5ee
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -7,6 +7,7 @@ include 'beam-commons'
include 'uc1-beam'
include 'uc2-beam'
include 'uc3-beam'
include 'uc1-load-generator'
include 'uc1-kstreams'
......
plugins {
id 'theodolite.kstreams'
id 'theodolite.beam'
}
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
mavenCentral()
}
}
dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
runtime 'org.slf4j:slf4j-api:1.7.32'
runtime 'org.slf4j:slf4j-jdk14:1.7.32'
compile project(':uc3-beam')
}
// This is the path of the main class, stored within ./src/main/java/
mainClassName = 'application.Uc3ApplicationBeam'
mainClassName = 'application.Uc3BeamFlink'
......@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import theodolite.commons.beam.AbstractBeamService;
import titan.ccp.model.records.ActivePowerRecord;
/**
......@@ -40,25 +41,14 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public final class Uc3ApplicationBeam {
private static final String JOB_NAME = "Uc3Application";
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String OUTPUT = "OUTPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
private static final String KAFKA_WINDOW_DURATION_DAYS = "KAFKA_WINDOW_DURATION_MINUTES";
private static final String AGGREGATION_ADVANCE_DAYS = "AGGREGATION_ADVANCE_DAYS";
private static final String TRIGGER_INTERVAL = "TRIGGER_INTERVAL";
public final class Uc3BeamFlink extends AbstractBeamService {
/**
* Private constructor to avoid instantiation.
*/
private Uc3ApplicationBeam() {
throw new UnsupportedOperationException();
private Uc3BeamFlink(final String[] args) { //NOPMD
super(args);
this.options.setRunner(FlinkRunner.class);
}
/**
......@@ -66,122 +56,12 @@ public final class Uc3ApplicationBeam {
*/
public static void main(final String[] args) {
// Set Configuration for Windows
final int windowDuration = Integer.parseInt(
System.getenv(KAFKA_WINDOW_DURATION_DAYS) == null
? "30" : System.getenv(KAFKA_WINDOW_DURATION_DAYS));
final Duration duration = Duration.standardDays(windowDuration);
final int aggregationAdvance = Integer.parseInt(
System.getenv(AGGREGATION_ADVANCE_DAYS) == null
? "1" : System.getenv(AGGREGATION_ADVANCE_DAYS));
final Duration advance = Duration.standardDays(aggregationAdvance);
final int triggerInterval = Integer.parseInt(
System.getenv(TRIGGER_INTERVAL) == null
? "15" : System.getenv(TRIGGER_INTERVAL));
final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
final Map<String, Object> consumerConfig = buildConsumerConfig(schemaRegistryUrl);
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(FlinkRunner.class);
options.setJobName(JOB_NAME);
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
registerCoders(cr);
@SuppressWarnings({"rawtypes", "unchecked"})
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
pipeline.apply(kafka)
// Map to correct time format
.apply(MapElements.via(new MapTimeFormat()))
// Apply a sliding window
.apply(Window
.<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(advance))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
// Aggregate per window for every key
.apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
new StatsAggregation()))
.setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
Uc3BeamFlink uc3BeamFlink = new Uc3BeamFlink(args);
// Map into correct output format
.apply(MapElements
.via(new SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>>() {
@Override
public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
}
}))
// Write to Kafka
.apply(KafkaIO.<String, String>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
Uc3BeamPipeline pipeline = new Uc3BeamPipeline(uc3BeamFlink.options, uc3BeamFlink.getConfig());
pipeline.run().waitUntilFinish();
}
/**
* Builds a configuration for a Kafka consumer.
* @param schemaRegistryUrl the url to the SchemaRegistry.
* @return the configuration.
*/
public static Map<String, Object> buildConsumerConfig(final String schemaRegistryUrl) {
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, JOB_NAME);
return consumerConfig;
}
/**
* Registers all Coders for all needed Coders.
* @param cr CoderRegistry.
*/
private static void registerCoders(final CoderRegistry cr) {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
}
}
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
aggregation.duration.days=30
aggregation.advance.days=1
trigger.interval=15
num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
plugins {
id 'theodolite.beam'
}
......@@ -7,7 +7,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
/**
* Composed key of an hour of the day and a sensor id.
*/
@DefaultCoder(AvroCoder.class)
public class HourOfDayKey {
......
package application;
import com.google.common.math.Stats;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
/**
*
*/
public class HourOfDayWithStats extends SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>> {
private final HourOfDayKeyFactory keyFactory = new HourOfDayKeyFactory();
@Override
public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
}
}
package application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.EventTimePolicy;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
* generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public final class Uc3BeamPipeline extends AbstractPipeline {
protected Uc3BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config);
// Additional needed variables
final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDurationDays = Integer.parseInt(
config.getString(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration duration = Duration.standardDays(windowDurationDays);
final int aggregationAdvance = Integer.parseInt(
config.getString(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration aggregationAdvanceDuration = Duration.standardDays(aggregationAdvance);
final int triggerInterval = Integer.parseInt(
config.getString(ConfigurationKeys.TRIGGER_INTERVAL));
final Duration triggerDelay = Duration.standardDays(aggregationAdvance);
// Build kafka configuration
final Properties consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry();
registerCoders(cr);
// Read from Kafka
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
kafkaActivePowerRecordReader =
new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
@SuppressWarnings({"rawtypes", "unchecked"})
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final MapTimeFormat mapTimeFormat = new MapTimeFormat();
final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
// Write to Kafka
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
this.apply(kafka)
// Map to correct time format
.apply(MapElements.via(new MapTimeFormat()))
// Apply a sliding window
.apply(Window
.<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(aggregationAdvanceDuration))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
// Aggregate per window for every key
.apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
new StatsAggregation()))
.setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
// Map into correct output format
.apply(MapElements
.via(hourOfDayWithStats))
// Write to Kafka
.apply(kafkaWriter);
}
/**
* Registers all Coders for all needed Coders.
* @param cr CoderRegistry.
*/
private static void registerCoders(final CoderRegistry cr) {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment