Skip to content
Snippets Groups Projects
Commit 4e2d6790 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into stu202077/spesb-gradle-convention

parents 917f7a8c e979a39e
Branches
Tags
1 merge request!95Introduce Gradle convention plugins
Showing
with 662 additions and 45 deletions
...@@ -38,6 +38,7 @@ build-benchmarks: ...@@ -38,6 +38,7 @@ build-benchmarks:
artifacts: artifacts:
paths: paths:
- "theodolite-benchmarks/build/libs/*.jar" - "theodolite-benchmarks/build/libs/*.jar"
- "theodolite-benchmarks/*/build/libs/*.jar"
- "theodolite-benchmarks/*/build/distributions/*.tar" - "theodolite-benchmarks/*/build/distributions/*.tar"
expire_in: 1 day expire_in: 1 day
...@@ -124,30 +125,54 @@ spotbugs-benchmarks: ...@@ -124,30 +125,54 @@ spotbugs-benchmarks:
when: manual when: manual
allow_failure: true allow_failure: true
deploy-uc1-kstreams-app: deploy-uc1-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app" IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application" JAVA_PROJECT_NAME: "uc1-application"
deploy-uc2-kstreams-app: deploy-uc2-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app" IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application" JAVA_PROJECT_NAME: "uc2-application"
deploy-uc3-kstreams-app: deploy-uc3-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app" IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application" JAVA_PROJECT_NAME: "uc3-application"
deploy-uc4-kstreams-app: deploy-uc4-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app" IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application" JAVA_PROJECT_NAME: "uc4-application"
deploy-uc1-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc1-flink"
JAVA_PROJECT_NAME: "uc1-application-flink"
deploy-uc2-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc2-flink"
JAVA_PROJECT_NAME: "uc2-application-flink"
deploy-uc3-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc3-flink"
JAVA_PROJECT_NAME: "uc3-application-flink"
deploy-uc4-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc4-flink"
JAVA_PROJECT_NAME: "uc4-application-flink"
deploy-uc1-load-generator: deploy-uc1-load-generator:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
......
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
expose:
- "9092"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
expose:
- "9092"
#ports:
# - 19092:19092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
- zookeeper
- kafka
expose:
- "8081"
#ports:
# - 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest
depends_on:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
NUM_SENSORS: 10
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc1-flink:latest
ports:
- "8080:8081"
command: standalone-job --job-classname theodolite.uc1.application.HistoryServiceFlinkJob
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
parallelism.default: 1
depends_on:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc1-flink:latest
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
depends_on:
- schema-registry
- kafka
...@@ -31,16 +31,16 @@ services: ...@@ -31,16 +31,16 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: theodolite/theodolite-uc1-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
uc-wg: load-generator:
image: theodolite/theodolite-uc1-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
expose:
- "9092"
#ports:
# - 19092:19092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
- zookeeper
- kafka
#ports:
# - "8081:8081"
expose:
- "8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest
depends_on:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
NUM_SENSORS: 10
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc2-flink:latest
ports:
- "8080:8081"
command: standalone-job --job-classname theodolite.uc2.application.HistoryServiceFlinkJob
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
parallelism.default: 1
depends_on:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc2-flink:latest
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
depends_on:
- schema-registry
- kafka
...@@ -32,8 +32,8 @@ services: ...@@ -32,8 +32,8 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: theodolite/theodolite-uc2-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc2-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
...@@ -41,8 +41,8 @@ services: ...@@ -41,8 +41,8 @@ services:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_WINDOW_DURATION_MINUTES: 60 KAFKA_WINDOW_DURATION_MINUTES: 60
uc-wg: load-generator:
image: theodolite/theodolite-uc2-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
expose:
- "9092"
#ports:
# - 19092:19092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
- zookeeper
- kafka
#ports:
# - "8081:8081"
expose:
- "8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
depends_on:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
NUM_SENSORS: 10
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc3-flink:latest
ports:
- "8080:8081"
command: standalone-job --job-classname theodolite.uc3.application.HistoryServiceFlinkJob
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
parallelism.default: 1
depends_on:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc3-flink:latest
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
depends_on:
- schema-registry
- kafka
...@@ -32,16 +32,16 @@ services: ...@@ -32,16 +32,16 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: theodolite/theodolite-uc3-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
uc-wg: load-generator:
image: theodolite/theodolite-uc3-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
......
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
expose:
- "9092"
#ports:
# - 19092:19092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
- zookeeper
- kafka
expose:
- "8081"
#ports:
# - 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
depends_on:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
NUM_SENSORS: 4
NUM_NESTED_GROUPS: 4
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
ports:
- "8080:8081"
command: standalone-job --job-classname theodolite.uc4.application.AggregationServiceFlinkJob
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
parallelism.default: 1
depends_on:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
depends_on:
- schema-registry
- kafka
...@@ -31,16 +31,16 @@ services: ...@@ -31,16 +31,16 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: theodolite/theodolite-uc4-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
uc-wg: load-generator:
image: theodolite/theodolite-uc4-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
......
...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true ...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true cleanup.remove_trailing_whitespaces_all=true
...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true ...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true sp_cleanup.add_missing_annotations=true
......
...@@ -20,7 +20,12 @@ dependencies { ...@@ -20,7 +20,12 @@ dependencies {
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:30.1-jre' implementation 'com.google.guava:guava:30.1-jre'
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-runtime_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
// Use JUnit test framework // Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
......
package theodolite.commons.flink;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String FLINK_CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.commons.flink;
import java.time.Duration;
import java.util.Properties;
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.util.SerializableSupplier;
/**
* A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
*/
public class KafkaConnectorFactory {
private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
private final Properties kafkaProps = new Properties();
private final boolean checkpointingEnabled;
private final String schemaRegistryUrl;
/**
* Create a new {@link KafkaConnectorFactory} from the provided parameters.
*/
public KafkaConnectorFactory(
final String appName,
final String bootstrapServers,
final boolean checkpointingEnabled,
final String schemaRegistryUrl) {
this.checkpointingEnabled = checkpointingEnabled;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, appName);
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link DeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final DeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link KafkaDeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final KafkaDeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaConsumer<Tuple2<K, V>> createConsumer(
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.<Tuple2<K, V>>createConsumer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes from a topic associated with Confluent
* Schema Registry.
*/
public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final String topic,
final Class<T> typeClass) {
// Maybe move to subclass for Confluent-Schema-Registry-specific things
final DeserializationSchema<T> deserializationSchema =
ConfluentRegistryAvroDeserializationSchema.forSpecific(typeClass, this.schemaRegistryUrl);
return this.createConsumer(topic, deserializationSchema);
}
private <T> FlinkKafkaConsumer<T> createBaseConsumer(final FlinkKafkaConsumer<T> baseConsumer) {
baseConsumer.setStartFromGroupOffsets();
if (this.checkpointingEnabled) {
baseConsumer.setCommitOffsetsOnCheckpoints(true); // TODO Validate if this is sensible
}
baseConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
return baseConsumer;
}
/**
* Create a new {@link FlinkKafkaProducer} that produces data using a
* {@link KafkaSerializationSchema}.
*/
public <T> FlinkKafkaProducer<T> createProducer(final String topic,
final KafkaSerializationSchema<T> serializationSchema) {
final Properties producerProps = this.buildProducerProperties();
return this.createBaseProducer(new FlinkKafkaProducer<>(
topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
}
/**
* Create a new {@link FlinkKafkaProducer} that produces {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.createProducer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
private <T> FlinkKafkaProducer<T> createBaseProducer(final FlinkKafkaProducer<T> baseProducer) {
baseProducer.setWriteTimestampToKafka(true);
return baseProducer;
}
private Properties buildProducerProperties() {
final Properties producerProps = this.cloneProperties();
producerProps.setProperty(
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
String.valueOf(PRODUCER_TRANSACTION_TIMEOUT.toMillis())); // TODO necessary?
return producerProps;
}
private Properties cloneProperties() {
final Properties props = new Properties();
props.putAll(this.kafkaProps);
return props;
}
}
package theodolite.commons.flink;
import java.io.IOException;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides factory methods for creating Flink {@link StateBackend}s.
*/
public final class StateBackends {
public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
// public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
private StateBackends() {}
/**
* Create a Flink {@link StateBackend} from a {@link Configuration} and the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND},
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_MEMORY_SIZE} and
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_PATH} configuration keys. Possible options for the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND} configuration are
* {@code #STATE_BACKEND_TYPE_ROCKSDB}, {@code #STATE_BACKEND_TYPE_FILESYSTEM} and
* {@code StateBackendFactory#STATE_BACKEND_TYPE_MEMORY}, where
* {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
*/
public static StateBackend fromConfiguration(final Configuration configuration) {
final String stateBackendType =
configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
switch (stateBackendType) {
case STATE_BACKEND_TYPE_MEMORY:
final int memoryStateBackendSize = configuration.getInt(
ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
return new MemoryStateBackend(memoryStateBackendSize);
case STATE_BACKEND_TYPE_FILESYSTEM:
final String stateBackendPath = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
return new FsStateBackend(stateBackendPath);
case STATE_BACKEND_TYPE_ROCKSDB:
final String stateBackendPath2 = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
try {
return new RocksDBStateBackend(stateBackendPath2, true);
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
throw new IllegalStateException(e);
}
default:
throw new IllegalArgumentException(
"Unsupported state backend '" + stateBackendType + "' configured.");
}
}
}
package theodolite.commons.flink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* Helper methods for creating {@link TypeInformation} for {@link Tuple}s. In contrast to
* {@code Types#TUPLE(TypeInformation...)}, these methods bring real type safety.
*/
public final class TupleType {
private TupleType() {}
public static <T1, T2> TypeInformation<Tuple2<T1, T2>> of(// NOPMD
final TypeInformation<T1> t0,
final TypeInformation<T2> t1) {
return Types.TUPLE(t0, t1);
}
}
package theodolite.commons.flink.serialization; package theodolite.commons.flink.serialization;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
...@@ -7,25 +8,31 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; ...@@ -7,25 +8,31 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.util.SerializableSupplier;
import javax.annotation.Nullable; /**
* A {@link KafkaSerializationSchema} and {@link KafkaDeserializationSchema} for an arbitrary
* key-value-pair in Kafka, mapped to/from a Flink {@link Tuple2}.
*
* @param <K> Type of the key.
* @param <V> Type of the value.
*/
public class FlinkKafkaKeyValueSerde<K, V> public class FlinkKafkaKeyValueSerde<K, V>
implements KafkaDeserializationSchema<Tuple2<K, V>>, implements KafkaDeserializationSchema<Tuple2<K, V>>, KafkaSerializationSchema<Tuple2<K, V>> {
KafkaSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = 2469569396501933443L; // NOPMD
private static final long serialVersionUID = 2469569396501933443L; private final SerializableSupplier<Serde<K>> keySerdeSupplier;
private final SerializableSupplier<Serde<V>> valueSerdeSupplier;
private final String topic;
private final TypeInformation<Tuple2<K, V>> typeInfo;
private transient Serde<K> keySerde; private transient Serde<K> keySerde;
private transient Serde<V> valueSerde; private transient Serde<V> valueSerde;
private SerializableSupplier<Serde<K>> keySerdeSupplier; /**
private SerializableSupplier<Serde<V>> valueSerdeSupplier; * Create a new {@link FlinkKafkaKeyValueSerde}.
*/
private String topic;
private TypeInformation<Tuple2<K,V>> typeInfo;
public FlinkKafkaKeyValueSerde(final String topic, public FlinkKafkaKeyValueSerde(final String topic,
final SerializableSupplier<Serde<K>> keySerdeSupplier, final SerializableSupplier<Serde<K>> keySerdeSupplier,
final SerializableSupplier<Serde<V>> valueSerdeSupplier, final SerializableSupplier<Serde<V>> valueSerdeSupplier,
...@@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V>
@Override @Override
public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) { public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
ensureInitialized(); this.ensureInitialized();
final K key = this.keySerde.deserializer().deserialize(this.topic, record.key()); final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value()); final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
return new Tuple2<>(key, value); return new Tuple2<>(key, value);
...@@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V>
} }
@Override @Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) { public ProducerRecord<byte[], byte[]> serialize(final Tuple2<K, V> element,
ensureInitialized(); @Nullable final Long timestamp) {
this.ensureInitialized();
final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0); final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1); final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
return new ProducerRecord<>(this.topic, key, value); return new ProducerRecord<>(this.topic, key, value);
...@@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V>
private void ensureInitialized() { private void ensureInitialized() {
if (this.keySerde == null || this.valueSerde == null) { if (this.keySerde == null || this.valueSerde == null) {
this.keySerde = this.keySerdeSupplier.get(); this.keySerde = this.keySerdeSupplier.get();
this.valueSerde = this.valueSerdeSupplier.get();; this.valueSerde = this.valueSerdeSupplier.get();
} }
} }
} }
...@@ -9,11 +9,11 @@ import com.google.common.math.Stats; ...@@ -9,11 +9,11 @@ import com.google.common.math.Stats;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Custom Kryo Serializer for efficient transmission between Flink instances. * Custom Kryo {@link Serializer} for efficient transmission between Flink instances.
*/ */
public class StatsSerializer extends Serializer<Stats> implements Serializable { public class StatsSerializer extends Serializer<Stats> implements Serializable {
private static final long serialVersionUID = -1276866176534267373L; private static final long serialVersionUID = -1276866176534267373L; //NOPMD
@Override @Override
public void write(final Kryo kryo, final Output output, final Stats object) { public void write(final Kryo kryo, final Output output, final Stats object) {
......
package theodolite.commons.flink.serialization; package theodolite.commons.flink.util;
import java.io.Serializable; import java.io.Serializable;
import java.util.function.Supplier; import java.util.function.Supplier;
public interface SerializableSupplier<T> extends Supplier<T>, Serializable { /**
// here be dragons * Interface for {@link Supplier}s which are serializable.
*
* @param <T> the type of results supplied by this supplier
*/
public interface SerializableSupplier<T> extends Supplier<T>, Serializable { // NOPMD
// Nothing to do here
} }
FROM nicobiernat/flink:1.11-scala_2.12-java_11 FROM flink:1.12-scala_2.12-java11
ADD build/libs/uc1-application-all.jar /opt/flink/usrlib/artifacts/uc1-application-all.jar ADD build/libs/uc1-application-flink-all.jar /opt/flink/usrlib/artifacts/uc1-application-flink-all.jar
package theodolite.uc1.application;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
* strings.
*/
public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private static final Gson GSON = new Gson();
@Override
public String map(final ActivePowerRecord value) throws Exception {
return GSON.toJson(value);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment