Skip to content
Snippets Groups Projects
Commit 786b874b authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add uc2-beam-flink +

Put inputTopic and bootstrapServer variable inside AbstractPipeline
parent 1e6586cf
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 183 additions and 25 deletions
...@@ -14,9 +14,15 @@ public class AbstractPipeline extends Pipeline { ...@@ -14,9 +14,15 @@ public class AbstractPipeline extends Pipeline {
// Application Configurations // Application Configurations
private final Configuration config; private final Configuration config;
protected final String inputTopic;
protected final String bootstrapServer;
protected AbstractPipeline(final PipelineOptions options, final Configuration config) { protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
super(options); super(options);
this.config = config; this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
} }
/** /**
......
...@@ -6,6 +6,7 @@ include 'flink-commons' ...@@ -6,6 +6,7 @@ include 'flink-commons'
include 'beam-commons' include 'beam-commons'
include 'uc1-beam' include 'uc1-beam'
include 'uc2-beam'
include 'uc1-load-generator' include 'uc1-load-generator'
include 'uc1-kstreams' include 'uc1-kstreams'
......
...@@ -33,12 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -33,12 +33,8 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
Uc1BeamPipeline(PipelineOptions options, Configuration config) { Uc1BeamPipeline(PipelineOptions options, Configuration config) {
super(options, config); super(options, config);
// Additional needed fields
String inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
String bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = this.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
......
plugins { plugins {
id 'theodolite.kstreams' id 'theodolite.beam'
}
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
mavenCentral()
}
} }
dependencies { dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0' compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
compile project(':uc2-beam')
compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
runtime 'org.slf4j:slf4j-api:1.7.32'
runtime 'org.slf4j:slf4j-jdk14:1.7.32'
} }
mainClassName = "application.Uc2ApplicationBeam" mainClassName = "application.Uc2BeamFlink"
package application;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import theodolite.commons.beam.AbstractBeamService;
/**
* Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute
* locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator
* using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter.
*/
public final class Uc2BeamFlink extends AbstractBeamService {
private static final String JOB_NAME = "Uc2Application";
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String OUTPUT = "OUTPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
private static final String KAFKA_WINDOW_DURATION_MINUTES = "KAFKA_WINDOW_DURATION_MINUTES";
/**
* Private constructor setting specific options for this use case.
*/
private Uc2BeamFlink(final String[] args) { //NOPMD
super(args);
this.options.setRunner(FlinkRunner.class);
}
/**
* Start running this microservice.
*/
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) {
Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args);
Pipeline pipeline = new Uc2BeamPipeline(uc2BeamFlink.options, uc2BeamFlink.getConfig());
pipeline.run().waitUntilFinish();
}
}
plugins {
id 'theodolite.beam'
}
package application;
import com.google.common.math.Stats;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
/**
* Transforms a {@code KV<String, Stats>} into a {@code KV<String, String>}.
*/
public class StatsToString extends SimpleFunction<KV<String, Stats>, KV<String, String>> {
private static final long serialVersionUID = 4308991244493097240L;
@Override
public KV<String, String> apply(final KV<String, Stats> kv) {
return KV.of(kv.getKey(), kv.getValue().toString());
}
}
...@@ -2,115 +2,76 @@ package application; ...@@ -2,115 +2,76 @@ package application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Properties;
import java.util.HashMap;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.beam.sdk.values.POutput;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
import theodolite.commons.beam.kafka.KafkaWriterTransformation;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* locally in standalone start Kafka, Zookeeper, the schema-registry and the workload generator * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
* using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. * using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/ */
public final class Uc2ApplicationBeam { public final class Uc2BeamPipeline extends AbstractPipeline {
private static final String JOB_NAME = "Uc2Application";
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String OUTPUT = "OUTPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
private static final String KAFKA_WINDOW_DURATION_MINUTES = "KAFKA_WINDOW_DURATION_MINUTES";
/**
* Private constructor to avoid instantiation.
*/
private Uc2ApplicationBeam() {
throw new UnsupportedOperationException();
}
/** protected Uc2BeamPipeline(PipelineOptions options, Configuration config) {
* Start running this microservice. super(options, config);
*/ // Additional needed variables
@SuppressWarnings({"serial", "unchecked", "rawtypes"}) String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
public static void main(final String[] args) {
// Set Configuration for Windows
final int windowDurationMinutes = Integer.parseInt( final int windowDurationMinutes = Integer.parseInt(
System.getenv(KAFKA_WINDOW_DURATION_MINUTES) == null config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
? "1"
: System.getenv(KAFKA_WINDOW_DURATION_MINUTES));
final Duration duration = Duration.standardMinutes(windowDurationMinutes); final Duration duration = Duration.standardMinutes(windowDurationMinutes);
// Set Configuration for Kafka // Build kafka configuration
final String bootstrapServer = Properties consumerConfig = buildConsumerConfig();
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "ucaplication");
// Create Pipeline Options from args.
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(JOB_NAME);
options.setRunner(FlinkRunner.class);
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(StatsAggregation.class, cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class)); SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka // Read from Kafka
pipeline.apply(kafka) final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
kafkaActivePowerRecordReader =
new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
// Transform into String
final StatsToString statsToString = new StatsToString();
// Write to Kafka
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
// Apply pipeline transformations
this.apply(kafkaActivePowerRecordReader)
// Apply a fixed window // Apply a fixed window
.apply(Window .apply(Window
.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration))) .<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
...@@ -120,20 +81,9 @@ public final class Uc2ApplicationBeam { ...@@ -120,20 +81,9 @@ public final class Uc2ApplicationBeam {
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class))) .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
// Map into correct output format // Map into correct output format
.apply(MapElements .apply(MapElements
.via(new SimpleFunction<KV<String, Stats>, KV<String, String>>() { .via(statsToString))
@Override
public KV<String, String> apply(final KV<String, Stats> kv) {
return KV.of(kv.getKey(), kv.getValue().toString());
}
}))
// Write to Kafka // Write to Kafka
.apply(KafkaIO.<String, String>write() .apply(kafkaWriter);
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
pipeline.run().waitUntilFinish();
} }
} }
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment