Skip to content
Snippets Groups Projects
Commit c8034d6c authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Migrate uc3-beam-flink

parent 85f1f108
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 450 additions and 0 deletions
...@@ -19,6 +19,7 @@ include 'uc2-beam-flink' ...@@ -19,6 +19,7 @@ include 'uc2-beam-flink'
include 'uc3-load-generator' include 'uc3-load-generator'
include 'uc3-kstreams' include 'uc3-kstreams'
include 'uc3-flink' include 'uc3-flink'
include 'uc3-beam-flink'
include 'uc4-load-generator' include 'uc4-load-generator'
include 'uc4-kstreams' include 'uc4-kstreams'
... ...
......
FROM openjdk:8-slim
ADD build/distributions/uc4-application-flink.tar /
CMD /uc4-application-flink/bin/uc4-application-flink --runner=FlinkRunner --flinkMaster=flink-jobmanager:8081 --streaming --parallelism=$PARALLELISM --disableMetrics=true --fasterCopy --stateBackend=rocksdb --stateBackendStoragePath=file:///data/flink/checkpoints
\ No newline at end of file
plugins {
id 'theodolite.kstreams'
}
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
mavenCentral()
}
}
dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
runtime 'org.slf4j:slf4j-api:1.7.32'
runtime 'org.slf4j:slf4j-jdk14:1.7.32'
}
// This is the path of the main class, stored within ./src/main/java/
mainClassName = 'application.Uc3ApplicationBeam'
package application;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
/**
* TimeStampPolicy to use event time based on the timestamp of the record value.
*/
public class EventTimePolicy
extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
return this.currentWatermark;
}
}
package application;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
/**
* Composed key of an hour of the day and a sensor id.
*/
@DefaultCoder(AvroCoder.class)
public class HourOfDayKey{
private final int hourOfDay;
private final String sensorId;
public HourOfDayKey(final int hourOfDay, final String sensorId) {
this.hourOfDay = hourOfDay;
this.sensorId = sensorId;
}
public int getHourOfDay() {
return this.hourOfDay;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.hourOfDay;
}
}
package application;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link HourOfDayKey}.
*/
public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
final int hourOfDay = dateTime.getHour();
return new HourOfDayKey(hourOfDay, sensorId);
}
@Override
public String getSensorId(final HourOfDayKey key) {
return key.getSensorId();
}
}
package application;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class HourOfDayKeySerde implements BufferSerde<HourOfDayKey> {
@Override
public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
buffer.putInt(data.getHourOfDay());
buffer.putString(data.getSensorId());
}
@Override
public HourOfDayKey deserialize(final ReadBuffer buffer) {
final int hourOfDay = buffer.getInt();
final String sensorId = buffer.getString();
return new HourOfDayKey(hourOfDay, sensorId);
}
public static Serde<HourOfDayKey> create() {
return SimpleSerdes.create(new HourOfDayKeySerde());
}
}
package application;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.kafka.common.serialization.Serde;
/**
* Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder.
*/
@SuppressWarnings("serial")
public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable {
private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create();
@Override
public void encode(final HourOfDayKey value, final OutputStream outStream)
throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = HourOfDayKeySerde.create();
}
final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
outStream.write(sizeinBytes);
outStream.write(bytes);
}
@Override
public HourOfDayKey decode(final InputStream inStream) throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = HourOfDayKeySerde.create();
}
final byte[] sizeinBytes = new byte[4];
inStream.read(sizeinBytes);
final int size = ByteBuffer.wrap(sizeinBytes).getInt();
final byte[] bytes = new byte[size];
inStream.read(bytes);
return this.innerSerde.deserializer().deserialize("deser", bytes);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
}
}
package application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import java.io.Serializable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Aggregation Class for ActivePowerRecords. Creates a StatsAccumulator based on the ValueInW.
*/
@DefaultCoder(AvroCoder.class)
public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumulator, Stats>
implements Serializable {
private static final long serialVersionUID = 1L;
@Override
public StatsAccumulator createAccumulator() {
return new StatsAccumulator();
}
@Override
public StatsAccumulator addInput(final StatsAccumulator accum, final ActivePowerRecord input) {
accum.add(input.getValueInW());
return accum;
}
@Override
public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) {
final StatsAccumulator merged = this.createAccumulator();
for (final StatsAccumulator accum : accums) {
merged.addAll(accum.snapshot());
}
return merged;
}
@Override
public Stats extractOutput(final StatsAccumulator accum) {
return accum.snapshot();
}
}
package application;
import java.time.LocalDateTime;
/**
* Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object
* and vice versa.
*
* @param <T> Type of the key
*/
public interface StatsKeyFactory<T> {
T createKey(String sensorId, LocalDateTime dateTime);
String getSensorId(T key);
}
package application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the
* Flink Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
* workload generator using the delayed_startup.sh script. And configure the Kafka, Zookeeper and
* Schema Registry urls accordingly. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public class Uc3ApplicationBeam {
@SuppressWarnings("serial")
public static void main(final String[] args) {
// Set Configuration for Windows
final int windowDuration = Integer.parseInt(
System.getenv("KAFKA_WINDOW_DURATION_DAYS") != null
? System.getenv("KAFKA_WINDOW_DURATION_DAYS")
: "30");
final Duration duration = Duration.standardDays(windowDuration);
final int aggregationAdvance = Integer.parseInt(
System.getenv("AGGREGATION_ADVANCE_DAYS") != null
? System.getenv("AGGREGATION_ADVANCE_DAYS")
: "1");
final Duration advance = Duration.standardDays(aggregationAdvance);
final int triggerInterval = Integer.parseInt(
System.getenv("TRIGGER_INTERVAL") != null
? System.getenv("TRIGGER_INTERVAL")
: "15");
final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
: "my-confluent-cp-kafka:9092";
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output";
final String schemaRegistryURL =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
: "http://my-confluent-cp-schema-registry:8081";
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(FlinkRunner.class);
options.setJobName("ucapplication");
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
@SuppressWarnings({"rawtypes", "unchecked"})
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
pipeline.apply(kafka)
// Map to correct time format
.apply(MapElements.via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>>() {
final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}))
// Apply a sliding window
.apply(Window
.<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(advance))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
// Aggregate per window for every key
.apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(
new StatsAggregation()))
.setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
// Map into correct output format
.apply(MapElements
.via(new SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>>() {
@Override
public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) {
return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString());
}
}))
// Write to Kafka
.apply(KafkaIO.<String, String>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
pipeline.run().waitUntilFinish();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment