From 3782684f7e2de55ae3a048ca31655f72bed71124 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Fri, 22 Oct 2021 10:28:31 +0200 Subject: [PATCH] Migrate uc3-beam-flink Co-authored-by: Jan Bensien <stu128012@mail.uni-kiel.de> --- theodolite-benchmarks/settings.gradle | 1 + .../uc3-beam-flink/Dockerfile | 5 + .../uc3-beam-flink/build.gradle | 30 +++ .../java/application/EventTimePolicy.java | 34 ++++ .../main/java/application/HourOfDayKey.java | 35 ++++ .../java/application/HourOfDayKeyFactory.java | 24 +++ .../java/application/HourOfDayKeySerde.java | 32 ++++ .../java/application/HourOfDaykeyCoder.java | 55 ++++++ .../java/application/StatsAggregation.java | 45 +++++ .../java/application/StatsKeyFactory.java | 17 ++ .../java/application/Uc3ApplicationBeam.java | 172 ++++++++++++++++++ 11 files changed, 450 insertions(+) create mode 100644 theodolite-benchmarks/uc3-beam-flink/Dockerfile create mode 100644 theodolite-benchmarks/uc3-beam-flink/build.gradle create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java create mode 100644 theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 208292aaf..88448110f 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -19,6 +19,7 @@ include 'uc2-beam-flink' include 'uc3-load-generator' include 'uc3-kstreams' include 'uc3-flink' +include 'uc3-beam-flink' include 'uc4-load-generator' include 'uc4-kstreams' diff --git a/theodolite-benchmarks/uc3-beam-flink/Dockerfile b/theodolite-benchmarks/uc3-beam-flink/Dockerfile new file mode 100644 index 000000000..f16ec202e --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-slim + +ADD build/distributions/uc4-application-flink.tar / + +CMD /uc4-application-flink/bin/uc4-application-flink --runner=FlinkRunner --flinkMaster=flink-jobmanager:8081 --streaming --parallelism=$PARALLELISM --disableMetrics=true --fasterCopy --stateBackend=rocksdb --stateBackendStoragePath=file:///data/flink/checkpoints \ No newline at end of file diff --git a/theodolite-benchmarks/uc3-beam-flink/build.gradle b/theodolite-benchmarks/uc3-beam-flink/build.gradle new file mode 100644 index 000000000..b202c53c0 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/build.gradle @@ -0,0 +1,30 @@ +plugins { + id 'theodolite.kstreams' +} + +allprojects { + repositories { + maven { + url 'https://packages.confluent.io/maven/' + } + mavenCentral() + } +} + +dependencies { + compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0' + compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0' + + compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){ + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } + compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' + + runtime 'org.apache.beam:beam-runners-direct-java:2.22.0' + runtime 'org.slf4j:slf4j-api:1.7.32' + runtime 'org.slf4j:slf4j-jdk14:1.7.32' +} + + +// This is the path of the main class, stored within ./src/main/java/ +mainClassName = 'application.Uc3ApplicationBeam' diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java new file mode 100644 index 000000000..993951cb3 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java @@ -0,0 +1,34 @@ +package application; + + +import java.util.Optional; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.io.kafka.TimestampPolicy; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Instant; + +/** + * TimeStampPolicy to use event time based on the timestamp of the record value. + */ +public class EventTimePolicy + extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> { + protected Instant currentWatermark; + + public EventTimePolicy(final Optional<Instant> previousWatermark) { + this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); + } + + + @Override + public Instant getTimestampForRecord(final PartitionContext ctx, + final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) { + this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp()); + return this.currentWatermark; + } + + @Override + public Instant getWatermark(final PartitionContext ctx) { + return this.currentWatermark; + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java new file mode 100644 index 000000000..55ed8b535 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java @@ -0,0 +1,35 @@ +package application; + + +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.AvroCoder; + +/** + * Composed key of an hour of the day and a sensor id. + */ + +@DefaultCoder(AvroCoder.class) +public class HourOfDayKey{ + + private final int hourOfDay; + private final String sensorId; + + public HourOfDayKey(final int hourOfDay, final String sensorId) { + this.hourOfDay = hourOfDay; + this.sensorId = sensorId; + } + + public int getHourOfDay() { + return this.hourOfDay; + } + + public String getSensorId() { + return this.sensorId; + } + + @Override + public String toString() { + return this.sensorId + ";" + this.hourOfDay; + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java new file mode 100644 index 000000000..b993a0199 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeyFactory.java @@ -0,0 +1,24 @@ +package application; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * {@link StatsKeyFactory} for {@link HourOfDayKey}. + */ +public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) { + final int hourOfDay = dateTime.getHour(); + return new HourOfDayKey(hourOfDay, sensorId); + } + + @Override + public String getSensorId(final HourOfDayKey key) { + return key.getSensorId(); + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java new file mode 100644 index 000000000..a0f8e0bba --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKeySerde.java @@ -0,0 +1,32 @@ +package application; + +import org.apache.kafka.common.serialization.Serde; +import titan.ccp.common.kafka.simpleserdes.BufferSerde; +import titan.ccp.common.kafka.simpleserdes.ReadBuffer; +import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; +import titan.ccp.common.kafka.simpleserdes.WriteBuffer; + +/** + * {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new + * Kafka {@link Serde}. + */ +public class HourOfDayKeySerde implements BufferSerde<HourOfDayKey> { + + @Override + public void serialize(final WriteBuffer buffer, final HourOfDayKey data) { + buffer.putInt(data.getHourOfDay()); + buffer.putString(data.getSensorId()); + } + + @Override + public HourOfDayKey deserialize(final ReadBuffer buffer) { + final int hourOfDay = buffer.getInt(); + final String sensorId = buffer.getString(); + return new HourOfDayKey(hourOfDay, sensorId); + } + + public static Serde<HourOfDayKey> create() { + return SimpleSerdes.create(new HourOfDayKeySerde()); + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java new file mode 100644 index 000000000..31fe4da7e --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java @@ -0,0 +1,55 @@ +package application; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.kafka.common.serialization.Serde; + +/** + * Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder. + */ +@SuppressWarnings("serial") +public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable { + private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create(); + + @Override + public void encode(final HourOfDayKey value, final OutputStream outStream) + throws CoderException, IOException { + if (this.innerSerde == null) { + this.innerSerde = HourOfDayKeySerde.create(); + } + final byte[] bytes = this.innerSerde.serializer().serialize("ser", value); + final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array(); + outStream.write(sizeinBytes); + outStream.write(bytes); + } + + @Override + public HourOfDayKey decode(final InputStream inStream) throws CoderException, IOException { + if (this.innerSerde == null) { + this.innerSerde = HourOfDayKeySerde.create(); + } + final byte[] sizeinBytes = new byte[4]; + inStream.read(sizeinBytes); + final int size = ByteBuffer.wrap(sizeinBytes).getInt(); + final byte[] bytes = new byte[size]; + inStream.read(bytes); + return this.innerSerde.deserializer().deserialize("deser", bytes); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return null; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java new file mode 100644 index 000000000..ee5cfc48b --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsAggregation.java @@ -0,0 +1,45 @@ +package application; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import java.io.Serializable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import titan.ccp.model.records.ActivePowerRecord; + + +/** + * Aggregation Class for ActivePowerRecords. Creates a StatsAccumulator based on the ValueInW. + */ + +@DefaultCoder(AvroCoder.class) +public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumulator, Stats> + implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public StatsAccumulator createAccumulator() { + return new StatsAccumulator(); + } + + @Override + public StatsAccumulator addInput(final StatsAccumulator accum, final ActivePowerRecord input) { + accum.add(input.getValueInW()); + return accum; + } + + @Override + public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) { + final StatsAccumulator merged = this.createAccumulator(); + for (final StatsAccumulator accum : accums) { + merged.addAll(accum.snapshot()); + } + return merged; + } + + @Override + public Stats extractOutput(final StatsAccumulator accum) { + return accum.snapshot(); + } +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java new file mode 100644 index 000000000..820168058 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/StatsKeyFactory.java @@ -0,0 +1,17 @@ +package application; + +import java.time.LocalDateTime; + +/** + * Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object + * and vice versa. + * + * @param <T> Type of the key + */ +public interface StatsKeyFactory<T> { + + T createKey(String sensorId, LocalDateTime dateTime); + + String getSensorId(T key); + +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java new file mode 100644 index 000000000..796f5a8cf --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java @@ -0,0 +1,172 @@ +package application; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.HashMap; +import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the + * Flink Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the + * workload generator using the delayed_startup.sh script. And configure the Kafka, Zookeeper and + * Schema Registry urls accordingly. Start a Flink cluster and pass its REST adress + * using--flinkMaster as run parameter. To persist logs add + * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard + * Input Output in Common in the Run Configuration Start via Eclipse Run. + */ +public class Uc3ApplicationBeam { + + + @SuppressWarnings("serial") + public static void main(final String[] args) { + + // Set Configuration for Windows + final int windowDuration = Integer.parseInt( + System.getenv("KAFKA_WINDOW_DURATION_DAYS") != null + ? System.getenv("KAFKA_WINDOW_DURATION_DAYS") + : "30"); + final Duration duration = Duration.standardDays(windowDuration); + + final int aggregationAdvance = Integer.parseInt( + System.getenv("AGGREGATION_ADVANCE_DAYS") != null + ? System.getenv("AGGREGATION_ADVANCE_DAYS") + : "1"); + final Duration advance = Duration.standardDays(aggregationAdvance); + final int triggerInterval = Integer.parseInt( + System.getenv("TRIGGER_INTERVAL") != null + ? System.getenv("TRIGGER_INTERVAL") + : "15"); + + final Duration triggerDelay = Duration.standardSeconds(triggerInterval); + + // Set Configuration for Kafka + final String bootstrapServer = + System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS") + : "my-confluent-cp-kafka:9092"; + final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input"; + final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output"; + final String schemaRegistryURL = + System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL") + : "http://my-confluent-cp-schema-registry:8081"; + + // Set consumer configuration for the schema registry and commits back to Kafka + final HashMap<String, Object> consumerConfig = new HashMap<>(); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put("schema.registry.url", schemaRegistryURL); + consumerConfig.put("specific.avro.reader", "true"); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application"); + final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + + + final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setRunner(FlinkRunner.class); + options.setJobName("ucapplication"); + final Pipeline pipeline = Pipeline.create(options); + final CoderRegistry cr = pipeline.getCoderRegistry(); + + + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); + cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder()); + cr.registerCoderForClass(StatsAggregation.class, + SerializableCoder.of(StatsAggregation.class)); + cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); + + + + @SuppressWarnings({"rawtypes", "unchecked"}) + final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = + KafkaIO.<String, ActivePowerRecord>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, + AvroCoder.of(ActivePowerRecord.class)) + .withConsumerConfigUpdates(consumerConfig) + // Set TimeStampPolicy for event time + .withTimestampPolicyFactory( + (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) + .withoutMetadata(); + // Apply pipeline transformations + // Read from Kafka + pipeline.apply(kafka) + // Map to correct time format + .apply(MapElements.via( + new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>>() { + final ZoneId zone = ZoneId.of("Europe/Paris"); + + @Override + public KV<application.HourOfDayKey, ActivePowerRecord> apply( + final KV<String, ActivePowerRecord> kv) { + final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); + return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), + kv.getValue()); + } + })) + + // Apply a sliding window + .apply(Window + .<KV<HourOfDayKey, ActivePowerRecord>>into(SlidingWindows.of(duration).every(advance)) + .triggering(AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()) + + // Aggregate per window for every key + .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey( + new StatsAggregation())) + .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class))) + + // Map into correct output format + .apply(MapElements + .via(new SimpleFunction<KV<HourOfDayKey, Stats>, KV<String, String>>() { + @Override + public KV<String, String> apply(final KV<HourOfDayKey, Stats> kv) { + return KV.of(keyFactory.getSensorId(kv.getKey()), kv.getValue().toString()); + } + })) + // Write to Kafka + .apply(KafkaIO.<String, String>write() + .withBootstrapServers(bootstrapServer) + .withTopic(outputTopic) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class)); + + + pipeline.run().waitUntilFinish(); + + + + } +} + -- GitLab