Skip to content
Snippets Groups Projects
Commit 5321cfd2 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

merged theodolit-kotlin

parents 08e610ab f136cc65
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!108Feature/185 Make Paths Configurable,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 681 additions and 44 deletions
...@@ -32,7 +32,7 @@ services: ...@@ -32,7 +32,7 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
...@@ -40,7 +40,7 @@ services: ...@@ -40,7 +40,7 @@ services:
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
uc-wg: load-generator:
image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
......
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka
expose:
- "9092"
#ports:
# - 19092:19092
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
- zookeeper
- kafka
expose:
- "8081"
#ports:
# - 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
depends_on:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
NUM_SENSORS: 4
NUM_NESTED_GROUPS: 4
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
ports:
- "8080:8081"
command: standalone-job --job-classname theodolite.uc4.application.AggregationServiceFlinkJob
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
parallelism.default: 1
depends_on:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: benchmark-jobmanager
depends_on:
- schema-registry
- kafka
...@@ -31,7 +31,7 @@ services: ...@@ -31,7 +31,7 @@ services:
environment: environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
uc-app: benchmark:
image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest
depends_on: depends_on:
- schema-registry - schema-registry
...@@ -39,7 +39,7 @@ services: ...@@ -39,7 +39,7 @@ services:
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
uc-wg: load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
depends_on: depends_on:
- schema-registry - schema-registry
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
plugins {
id 'theodolite.java-commons'
}
ext {
flinkVersion = '1.12.0'
scalaBinaryVersion = '2.12'
}
repositories {
jcenter()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
}
}
dependencies {
// Special version required because of https://issues.apache.org/jira/browse/FLINK-13703
implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:30.1-jre'
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-runtime_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
package theodolite.commons.flink;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String FLINK_CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.commons.flink;
import java.time.Duration;
import java.util.Properties;
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.util.SerializableSupplier;
/**
* A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
*/
public class KafkaConnectorFactory {
private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
private final Properties kafkaProps = new Properties();
private final boolean checkpointingEnabled;
private final String schemaRegistryUrl;
/**
* Create a new {@link KafkaConnectorFactory} from the provided parameters.
*/
public KafkaConnectorFactory(
final String appName,
final String bootstrapServers,
final boolean checkpointingEnabled,
final String schemaRegistryUrl) {
this.checkpointingEnabled = checkpointingEnabled;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, appName);
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link DeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final DeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes data using a
* {@link KafkaDeserializationSchema}.
*/
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final KafkaDeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaConsumer<Tuple2<K, V>> createConsumer(
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.<Tuple2<K, V>>createConsumer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
/**
* Create a new {@link FlinkKafkaConsumer} that consumes from a topic associated with Confluent
* Schema Registry.
*/
public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final String topic,
final Class<T> typeClass) {
// Maybe move to subclass for Confluent-Schema-Registry-specific things
final DeserializationSchema<T> deserializationSchema =
ConfluentRegistryAvroDeserializationSchema.forSpecific(typeClass, this.schemaRegistryUrl);
return this.createConsumer(topic, deserializationSchema);
}
private <T> FlinkKafkaConsumer<T> createBaseConsumer(final FlinkKafkaConsumer<T> baseConsumer) {
baseConsumer.setStartFromGroupOffsets();
if (this.checkpointingEnabled) {
baseConsumer.setCommitOffsetsOnCheckpoints(true); // TODO Validate if this is sensible
}
baseConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
return baseConsumer;
}
/**
* Create a new {@link FlinkKafkaProducer} that produces data using a
* {@link KafkaSerializationSchema}.
*/
public <T> FlinkKafkaProducer<T> createProducer(final String topic,
final KafkaSerializationSchema<T> serializationSchema) {
final Properties producerProps = this.buildProducerProperties();
return this.createBaseProducer(new FlinkKafkaProducer<>(
topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
}
/**
* Create a new {@link FlinkKafkaProducer} that produces {@link Tuple2}s using two Kafka
* {@link Serde}s.
*/
public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
final String topic,
final SerializableSupplier<Serde<K>> kafkaKeySerde,
final SerializableSupplier<Serde<V>> kafkaValueSerde,
final TypeInformation<Tuple2<K, V>> typeInformation) {
return this.createProducer(
topic,
new FlinkKafkaKeyValueSerde<>(
topic,
kafkaKeySerde,
kafkaValueSerde,
typeInformation));
}
private <T> FlinkKafkaProducer<T> createBaseProducer(final FlinkKafkaProducer<T> baseProducer) {
baseProducer.setWriteTimestampToKafka(true);
return baseProducer;
}
private Properties buildProducerProperties() {
final Properties producerProps = this.cloneProperties();
producerProps.setProperty(
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
String.valueOf(PRODUCER_TRANSACTION_TIMEOUT.toMillis())); // TODO necessary?
return producerProps;
}
private Properties cloneProperties() {
final Properties props = new Properties();
props.putAll(this.kafkaProps);
return props;
}
}
package theodolite.commons.flink;
import java.io.IOException;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides factory methods for creating Flink {@link StateBackend}s.
*/
public final class StateBackends {
public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
// public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
private StateBackends() {}
/**
* Create a Flink {@link StateBackend} from a {@link Configuration} and the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND},
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_MEMORY_SIZE} and
* {@code ConfigurationKeys#FLINK_STATE_BACKEND_PATH} configuration keys. Possible options for the
* {@code ConfigurationKeys#FLINK_STATE_BACKEND} configuration are
* {@code #STATE_BACKEND_TYPE_ROCKSDB}, {@code #STATE_BACKEND_TYPE_FILESYSTEM} and
* {@code StateBackendFactory#STATE_BACKEND_TYPE_MEMORY}, where
* {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
*/
public static StateBackend fromConfiguration(final Configuration configuration) {
final String stateBackendType =
configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
switch (stateBackendType) {
case STATE_BACKEND_TYPE_MEMORY:
final int memoryStateBackendSize = configuration.getInt(
ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
return new MemoryStateBackend(memoryStateBackendSize);
case STATE_BACKEND_TYPE_FILESYSTEM:
final String stateBackendPath = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
return new FsStateBackend(stateBackendPath);
case STATE_BACKEND_TYPE_ROCKSDB:
final String stateBackendPath2 = configuration.getString(
ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
DEFAULT_STATE_BACKEND_PATH);
try {
return new RocksDBStateBackend(stateBackendPath2, true);
} catch (final IOException e) {
LOGGER.error("Cannot create RocksDB state backend.", e);
throw new IllegalStateException(e);
}
default:
throw new IllegalArgumentException(
"Unsupported state backend '" + stateBackendType + "' configured.");
}
}
}
package theodolite.commons.flink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* Helper methods for creating {@link TypeInformation} for {@link Tuple}s. In contrast to
* {@code Types#TUPLE(TypeInformation...)}, these methods bring real type safety.
*/
public final class TupleType {
private TupleType() {}
public static <T1, T2> TypeInformation<Tuple2<T1, T2>> of(// NOPMD
final TypeInformation<T1> t0,
final TypeInformation<T2> t1) {
return Types.TUPLE(t0, t1);
}
}
package theodolite.commons.flink.serialization;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.util.SerializableSupplier;
/**
* A {@link KafkaSerializationSchema} and {@link KafkaDeserializationSchema} for an arbitrary
* key-value-pair in Kafka, mapped to/from a Flink {@link Tuple2}.
*
* @param <K> Type of the key.
* @param <V> Type of the value.
*/
public class FlinkKafkaKeyValueSerde<K, V>
implements KafkaDeserializationSchema<Tuple2<K, V>>, KafkaSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = 2469569396501933443L; // NOPMD
private final SerializableSupplier<Serde<K>> keySerdeSupplier;
private final SerializableSupplier<Serde<V>> valueSerdeSupplier;
private final String topic;
private final TypeInformation<Tuple2<K, V>> typeInfo;
private transient Serde<K> keySerde;
private transient Serde<V> valueSerde;
/**
* Create a new {@link FlinkKafkaKeyValueSerde}.
*/
public FlinkKafkaKeyValueSerde(final String topic,
final SerializableSupplier<Serde<K>> keySerdeSupplier,
final SerializableSupplier<Serde<V>> valueSerdeSupplier,
final TypeInformation<Tuple2<K, V>> typeInfo) {
this.topic = topic;
this.typeInfo = typeInfo;
this.keySerdeSupplier = keySerdeSupplier;
this.valueSerdeSupplier = valueSerdeSupplier;
}
@Override
public boolean isEndOfStream(final Tuple2<K, V> nextElement) {
return false;
}
@Override
public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
this.ensureInitialized();
final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
return new Tuple2<>(key, value);
}
@Override
public TypeInformation<Tuple2<K, V>> getProducedType() {
return this.typeInfo;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(final Tuple2<K, V> element,
@Nullable final Long timestamp) {
this.ensureInitialized();
final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
return new ProducerRecord<>(this.topic, key, value);
}
private void ensureInitialized() {
if (this.keySerde == null || this.valueSerde == null) {
this.keySerde = this.keySerdeSupplier.get();
this.valueSerde = this.valueSerdeSupplier.get();
}
}
}
package theodolite.commons.flink.serialization;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.math.Stats;
import java.io.Serializable;
/**
* Custom Kryo {@link Serializer} for efficient transmission between Flink instances.
*/
public class StatsSerializer extends Serializer<Stats> implements Serializable {
private static final long serialVersionUID = -1276866176534267373L; //NOPMD
@Override
public void write(final Kryo kryo, final Output output, final Stats object) {
final byte[] data = object.toByteArray();
output.writeInt(data.length);
output.writeBytes(data);
}
@Override
public Stats read(final Kryo kryo, final Input input, final Class<Stats> type) {
final int numBytes = input.readInt();
return Stats.fromByteArray(input.readBytes(numBytes));
}
}
package theodolite.commons.flink.util;
import java.io.Serializable;
import java.util.function.Supplier;
/**
* Interface for {@link Supplier}s which are serializable.
*
* @param <T> the type of results supplied by this supplier
*/
public interface SerializableSupplier<T> extends Supplier<T>, Serializable { // NOPMD
// Nothing to do here
}
No preview for this file type
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
#!/usr/bin/env sh #!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
############################################################################## ##############################################################################
## ##
## Gradle start up script for UN*X ## Gradle start up script for UN*X
...@@ -28,7 +44,7 @@ APP_NAME="Gradle" ...@@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"` APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m"' DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum" MAX_FD="maximum"
...@@ -66,6 +82,7 @@ esac ...@@ -66,6 +82,7 @@ esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
...@@ -109,10 +126,11 @@ if $darwin; then ...@@ -109,10 +126,11 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi fi
# For Cygwin, switch paths to Windows format before running java # For Cygwin or MSYS, switch paths to Windows format before running java
if $cygwin ; then if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"` APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"` JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath # We build the pattern for arguments to be converted via cygpath
...@@ -138,19 +156,19 @@ if $cygwin ; then ...@@ -138,19 +156,19 @@ if $cygwin ; then
else else
eval `echo args$i`="\"$arg\"" eval `echo args$i`="\"$arg\""
fi fi
i=$((i+1)) i=`expr $i + 1`
done done
case $i in case $i in
(0) set -- ;; 0) set -- ;;
(1) set -- "$args0" ;; 1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;; 2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;; 3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;; 4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac esac
fi fi
...@@ -159,14 +177,9 @@ save () { ...@@ -159,14 +177,9 @@ save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " " echo " "
} }
APP_ARGS=$(save "$@") APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules # Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "$@" exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off @if "%DEBUG%" == "" @echo off
@rem ########################################################################## @rem ##########################################################################
@rem @rem
...@@ -13,15 +29,18 @@ if "%DIRNAME%" == "" set DIRNAME=. ...@@ -13,15 +29,18 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1 %JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init if "%ERRORLEVEL%" == "0" goto execute
echo. echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
...@@ -35,7 +54,7 @@ goto fail ...@@ -35,7 +54,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=% set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init if exist "%JAVA_EXE%" goto execute
echo. echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
...@@ -45,28 +64,14 @@ echo location of your Java installation. ...@@ -45,28 +64,14 @@ echo location of your Java installation.
goto fail goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute :execute
@rem Setup the command line @rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment