diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 2c780c3e894530ae15ef91692cf1db1c5ab9fd58..208292aaf9911838363ee5633a01594925099fde 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -14,6 +14,7 @@ include 'uc1-beam-flink' include 'uc2-load-generator' include 'uc2-kstreams' include 'uc2-flink' +include 'uc2-beam-flink' include 'uc3-load-generator' include 'uc3-kstreams' diff --git a/theodolite-benchmarks/uc2-beam-flink/Dockerfile b/theodolite-benchmarks/uc2-beam-flink/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..baee343ad95295b44ee1d459a867a052c45b24c9 --- /dev/null +++ b/theodolite-benchmarks/uc2-beam-flink/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-slim + +ADD build/distributions/uc3-application-flink.tar / + +CMD /uc3-application-flink/bin/uc3-application-flink --runner=FlinkRunner --flinkMaster=flink-jobmanager:8081 --streaming --parallelism=$PARALLELISM --disableMetrics=true --fasterCopy \ No newline at end of file diff --git a/theodolite-benchmarks/uc2-beam-flink/build.gradle b/theodolite-benchmarks/uc2-beam-flink/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..00ee51e800bbc5d151e205bccdf5c8a0f1d04a4b --- /dev/null +++ b/theodolite-benchmarks/uc2-beam-flink/build.gradle @@ -0,0 +1,29 @@ +plugins { + id 'theodolite.kstreams' +} + +allprojects { + repositories { + maven { + url 'https://packages.confluent.io/maven/' + } + mavenCentral() + } +} + + +dependencies { + compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0' + compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0' + + compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){ + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } + compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' + + runtime 'org.apache.beam:beam-runners-direct-java:2.22.0' + runtime 'org.slf4j:slf4j-api:1.7.32' + runtime 'org.slf4j:slf4j-jdk14:1.7.32' +} + +mainClassName = "application.Uc2ApplicationBeam" diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/StatsAggregation.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/StatsAggregation.java new file mode 100644 index 0000000000000000000000000000000000000000..270e42f6496754117ffdcffd0918b0b178deda13 --- /dev/null +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/StatsAggregation.java @@ -0,0 +1,45 @@ +package application; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import java.io.Serializable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Aggregation Class for ActivePowerRecords. Creates a StatsAccumulator based on the ValueInW. + */ + +@DefaultCoder(AvroCoder.class) +public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumulator, Stats> + implements Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public StatsAccumulator createAccumulator() { + return new StatsAccumulator(); + } + + @Override + public StatsAccumulator addInput(final StatsAccumulator accum, final ActivePowerRecord input) { + accum.add(input.getValueInW()); + return accum; + } + + @Override + public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) { + final StatsAccumulator merged = this.createAccumulator(); + for (final StatsAccumulator accum : accums) { + merged.addAll(accum.snapshot()); + } + return merged; + } + + @Override + public Stats extractOutput(final StatsAccumulator accum) { + return accum.snapshot(); + } +} diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java new file mode 100644 index 0000000000000000000000000000000000000000..e001d7219f69203c9198f6fdc978874345add207 --- /dev/null +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2ApplicationBeam.java @@ -0,0 +1,139 @@ +package application; + +import com.google.common.math.Stats; +import com.google.common.math.StatsAccumulator; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.HashMap; +import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute + * locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator + * using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress + * using--flinkMaster as run parameter. + */ +public final class Uc2ApplicationBeam { + private static final String JOB_NAME = "Uc2Application"; + private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS"; + private static final String INPUT = "INPUT"; + private static final String OUTPUT = "OUTPUT"; + private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL"; + private static final String YES = "true"; + private static final String USE_AVRO_READER = YES; + private static final String AUTO_COMMIT_CONFIG = YES; + private static final String KAFKA_WINDOW_DURATION_MINUTES = "KAFKA_WINDOW_DURATION_MINUTES"; + + /** + * Private constructor to avoid instantiation. + */ + private Uc2ApplicationBeam() { + throw new UnsupportedOperationException(); + } + + /** + * Start running this microservice. + */ + @SuppressWarnings({"serial", "unchecked", "rawtypes"}) + public static void main(final String[] args) { + + // Set Configuration for Windows + final int windowDurationMinutes = Integer.parseInt( + System.getenv(KAFKA_WINDOW_DURATION_MINUTES) == null + ? "1" + : System.getenv(KAFKA_WINDOW_DURATION_MINUTES)); + final Duration duration = Duration.standardMinutes(windowDurationMinutes); + + // Set Configuration for Kafka + final String bootstrapServer = + System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092" + : System.getenv(BOOTSTRAP); + final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT); + final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT); + final String schemaRegistryUrl = + System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081" + : System.getenv(SCHEMA_REGISTRY); + + // Set consumer configuration for the schema registry and commits back to Kafka + final HashMap<String, Object> consumerConfig = new HashMap<>(); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put("schema.registry.url", schemaRegistryUrl); + consumerConfig.put("specific.avro.reader", USE_AVRO_READER); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "ucaplication"); + + // Create Pipeline Options from args. + final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setJobName(JOB_NAME); + options.setRunner(FlinkRunner.class); + + + final Pipeline pipeline = Pipeline.create(options); + final CoderRegistry cr = pipeline.getCoderRegistry(); + + // Set Coders for Classes that will be distributed + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); + cr.registerCoderForClass(StatsAggregation.class, + SerializableCoder.of(StatsAggregation.class)); + cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); + + final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = + KafkaIO.<String, ActivePowerRecord>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, + AvroCoder.of(ActivePowerRecord.class)) + .withConsumerConfigUpdates(consumerConfig) + .withoutMetadata(); + // Apply pipeline transformations + // Read from Kafka + pipeline.apply(kafka) + // Apply a fixed window + .apply(Window + .<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration))) + // Aggregate per window for every key + .apply(Combine.<String, ActivePowerRecord, Stats>perKey( + new StatsAggregation())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class))) + // Map into correct output format + .apply(MapElements + .via(new SimpleFunction<KV<String, Stats>, KV<String, String>>() { + @Override + public KV<String, String> apply(final KV<String, Stats> kv) { + return KV.of(kv.getKey(), kv.getValue().toString()); + } + })) + // Write to Kafka + .apply(KafkaIO.<String, String>write() + .withBootstrapServers(bootstrapServer) + .withTopic(outputTopic) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class)); + + pipeline.run().waitUntilFinish(); + } +} +