Skip to content
Snippets Groups Projects
Commit 191b9eb1 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add beam common project + plugin

parent cb48d6b5
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
plugins {
id 'theodolite.java-commons'
}
repositories {
jcenter()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
}
}
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
// compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
package theodolite.commons.beam;
import java.util.Properties;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a beam microservice.
*/
public class AbstractBeamService {
// Application Configurations
public static final Configuration CONFIG = ServiceConfigurations.createWithDefaults();
public static final String APPLICATION_NAME =
CONFIG.getString(ConfigurationKeys.APPLICATION_NAME);
// Beam Pipeline
protected PipelineOptions options;
public AbstractBeamService(String[] args) {
options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(APPLICATION_NAME);
}
/**
* Abstract main for a Beam Service.
*/
public static void main(final String[] args) {
AbstractBeamService service = new AbstractBeamService(args);
service.run();
}
public void run() {
}
/**
* Builds a simple configuration for a Kafka consumer.
*
* @return the build Kafka consumer configuration.
*/
public Properties buildConsumerConfig() {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
CONFIG.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
CONFIG
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put("schema.registry.url",
CONFIG.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put("specific.avro.reader",
CONFIG.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPLICATION_NAME);
return consumerConfig;
}
}
package theodolite.commons.beam;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
// Common keys
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
// UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// BEAM
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config";
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
private ConfigurationKeys() {
}
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaAggregatedPowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaAggregatedPowerRecordReader(String bootstrapServer, String inputTopic,
Map<Object, Object> consumerConfig) {
super();
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(PBegin input) {
return input.apply(this.reader);
}
}
......@@ -26,7 +26,7 @@ dependencies {
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25'
// implementation project(':beam-commons')
implementation project(':beam-commons')
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
// compile group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0'
......
......@@ -3,6 +3,7 @@ rootProject.name = 'theodolite-benchmarks'
include 'load-generator-commons'
include 'kstreams-commons'
include 'flink-commons'
include 'beam-commons'
include 'uc1-load-generator'
include 'uc1-kstreams'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment