Skip to content
Snippets Groups Projects
Commit 66ae4c77 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change uc1-beam-flink to use beam commons

parent 191b9eb1
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -2,14 +2,12 @@ package application;
import com.google.gson.Gson;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.HashMap;
import java.util.Properties;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
......@@ -18,10 +16,11 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.beam.AbstractBeamService;
import theodolite.commons.beam.ConfigurationKeys;
import titan.ccp.model.records.ActivePowerRecord;
/**
......@@ -32,20 +31,20 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public final class Uc1ApplicationBeam {
public final class Uc1ApplicationBeam extends AbstractBeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc1ApplicationBeam.class);
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
private final String inputTopic = CONFIG.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
private final String bootstrapServer =
CONFIG.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
/**
* Private constructor to avoid instantiation.
* Private constructor setting specific options for this use case.
*/
private Uc1ApplicationBeam() {
throw new UnsupportedOperationException();
private Uc1ApplicationBeam(final String[] args) { //NOPMD
super(args);
LOGGER.info(this.options.toString());
this.options.setRunner(FlinkRunner.class);
}
/**
......@@ -55,45 +54,31 @@ public final class Uc1ApplicationBeam {
@SuppressWarnings({"unchecked", "rawtypes","unused"})
public static void main(final String[] args) {
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
final LogKeyValue logKeyValue = new LogKeyValue();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName("ucapplication");
options.setRunner(FlinkRunner.class);
final Pipeline pipeline = Pipeline.create(options);
final Uc1ApplicationBeam uc1 = new Uc1ApplicationBeam(args);
final CoderRegistry cr = pipeline.getCoderRegistry();
// create pipeline
final Pipeline pipeline = Pipeline.create(uc1.options);
// Set Coders for Classes that will be distributed
final CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig
final Properties consumerConfig = uc1.buildConsumerConfig();
// Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withBootstrapServers(uc1.bootstrapServer)
.withTopic(uc1.inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
final LogKeyValue logKeyValue = new LogKeyValue();
// Apply pipeline transformations
// Read from Kafka
pipeline.apply(kafka)
......@@ -119,6 +104,7 @@ public final class Uc1ApplicationBeam {
pipeline.run().waitUntilFinish();
}
/**
* Logs all Key Value pairs.
*/
......
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment