From 37946776fc34565be31e44d445361683da6af91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 9 Mar 2021 17:01:45 +0100 Subject: [PATCH] Migrate to Avro records, upgrade to Flink 1.12 --- theodolite-benchmarks/build.gradle | 20 ++- .../flink-commons/build.gradle | 11 +- .../FlinkMonitoringRecordSerde.java | 126 ------------------ .../uc1/application/ConfigurationKeys.java | 2 + .../application/HistoryServiceFlinkJob.java | 21 ++- .../resources/META-INF/application.properties | 2 + .../uc2/application/ConfigurationKeys.java | 2 + .../application/HistoryServiceFlinkJob.java | 30 ++--- .../application/StatsAggregateFunction.java | 2 +- .../resources/META-INF/application.properties | 1 + .../uc3/application/ConfigurationKeys.java | 2 + .../application/HistoryServiceFlinkJob.java | 24 ++-- .../application/StatsAggregateFunction.java | 4 +- .../resources/META-INF/application.properties | 1 + .../AggregationServiceFlinkJob.java | 44 +++--- .../uc4/application/ConfigurationKeys.java | 2 + .../JoinAndDuplicateCoFlatMapFunction.java | 2 +- ...uplicateKeyedBroadcastProcessFunction.java | 2 +- ...ecordAggregationProcessWindowFunction.java | 12 +- .../resources/META-INF/application.properties | 1 + 20 files changed, 94 insertions(+), 217 deletions(-) delete mode 100644 theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java diff --git a/theodolite-benchmarks/build.gradle b/theodolite-benchmarks/build.gradle index c70ada70c..68a06296f 100644 --- a/theodolite-benchmarks/build.gradle +++ b/theodolite-benchmarks/build.gradle @@ -87,36 +87,32 @@ configure(useCaseApplications) { // Dependencies for all Flink benchmarks (use case applications) configure(useCaseApplicationsFlink) { ext { - flinkVersion = '1.11.0' + flinkVersion = '1.12.0' scalaBinaryVersion = '2.12' } dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - compile('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { - changing = true - } - api 'net.kieker-monitoring:kieker:1.14'//-SNAPSHOT' - api 'net.sourceforge.teetime:teetime:3.0' - // TODO Upgrade to 0.1.0 + // Special version required because of https://issues.apache.org/jira/browse/FLINK-13703 + implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.apache.kafka:kafka-clients:2.2.0' implementation 'com.google.guava:guava:24.1-jre' - implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' compile project(':flink-commons') - compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0" + //compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0" compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}" compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.industrial-devops', name: 'titan-ccp-common', version: '0.0.3-SNAPSHOT' + implementation "org.apache.flink:flink-avro:${flinkVersion}" + implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}" compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_2.12', version: '1.11.1' + compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_${scalaBinaryVersion}', version: "${flinkVersion}" // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/flink-commons/build.gradle b/theodolite-benchmarks/flink-commons/build.gradle index 79956c439..26b3a68ff 100644 --- a/theodolite-benchmarks/flink-commons/build.gradle +++ b/theodolite-benchmarks/flink-commons/build.gradle @@ -1,15 +1,12 @@ ext { - flinkVersion = '1.11.0' + flinkVersion = '1.12.0' scalaBinaryVersion = '2.12' } dependencies { - api 'org.apache.kafka:kafka-clients:2.2.0' - // implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { - changing = true - // exclude group: 'net.kieker-monitoring', module: 'kieker' - } + // Special version required because of https://issues.apache.org/jira/browse/FLINK-13703 + implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:30.1-jre' compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java deleted file mode 100644 index 8bca461e4..000000000 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java +++ /dev/null @@ -1,126 +0,0 @@ -package theodolite.commons.flink.serialization; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.factory.IRecordFactory; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; - -/** - * This class wraps the serializer and deserializer implementations for {@link IMonitoringRecord} - * from {@link IMonitoringRecordSerde} - * into the Flink {@link DeserializationSchema} and {@link SerializationSchema} - * and Kryo {@link Serializer} interfaces. - * It is used for serialization to and from Kafka as well as internal serialization - * between Flink instances. - * This class is also itself serializable by Flink. - * @param <R> The specific record type that extends {@link IMonitoringRecord} - * @param <F> The specific record factory type that extends {@link IRecordFactory<R>} - */ -public class FlinkMonitoringRecordSerde<R extends IMonitoringRecord, F extends IRecordFactory<R>> - extends Serializer<R> - implements KafkaDeserializationSchema<R>, - KafkaSerializationSchema<R> { - - private static final long serialVersionUID = -5687951056995646212L; - - private final String topic; - private transient Serde<String> keySerde; - private transient Serde<R> serde; - - private final Class<R> recordClass; - private final Class<F> recordFactoryClass; - - /** - * Creates a new FlinkMonitoringRecordSerde. - * @param topic - * The Kafka topic to/from which to serialize/deserialize. - * @param recordClass - * The class of the serialized/deserialized record. - * @param recordFactoryClass - * The class of the factory for the serialized/deserialized record. - */ - public FlinkMonitoringRecordSerde(final String topic, - final Class<R> recordClass, - final Class<F> recordFactoryClass) { - this.topic = topic; - this.recordClass = recordClass; - this.recordFactoryClass = recordFactoryClass; - } - - @Override - public boolean isEndOfStream(final R nextElement) { - return false; - } - - @Override - public TypeInformation<R> getProducedType() { - return TypeExtractor.getForClass(recordClass); - } - - @Override - public R deserialize(ConsumerRecord<byte[], byte[]> record) { - ensureInitialized(); - return this.serde.deserializer().deserialize(this.topic, record.value()); - } - - @Override - public ProducerRecord<byte[], byte[]> serialize(R element, @Nullable Long timestamp) { - ensureInitialized(); - String identifier = null; - if (element instanceof ActivePowerRecord) { - identifier = ((ActivePowerRecord) element).getIdentifier(); - } - if (element instanceof AggregatedActivePowerRecord) { - identifier = ((AggregatedActivePowerRecord) element).getIdentifier(); - } - final byte[] key = this.keySerde.serializer().serialize(this.topic, identifier); - final byte[] value = this.serde.serializer().serialize(this.topic, element); - return new ProducerRecord<>(this.topic, key, value); - } - - private void ensureInitialized() { - if (this.keySerde == null || this.serde == null) { - try { - this.keySerde = Serdes.String(); - this.serde = IMonitoringRecordSerde.serde( - recordFactoryClass.getDeclaredConstructor().newInstance()); - } catch (NoSuchMethodException | InstantiationException | IllegalAccessException - | InvocationTargetException e) { - e.printStackTrace(); - } - } - } - - @Override - public void write(final Kryo kryo, final Output output, final R record) { - ensureInitialized(); - final byte[] data = this.serde.serializer().serialize(this.topic, record); - output.writeInt(data.length); - output.writeBytes(data); - } - - @Override - public R read(final Kryo kryo, final Input input, final Class<R> type) { - ensureInitialized(); - final int numBytes = input.readInt(); - return this.serde.deserializer().deserialize(this.topic, input.readBytes(numBytes)); - } -} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java index 3d1de4085..ed961bab7 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java @@ -15,6 +15,8 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + public static final String CHECKPOINTING = "checkpointing"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 931436543..24a60f535 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -1,13 +1,13 @@ package theodolite.uc1.application; import org.apache.commons.configuration2.Configuration; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Properties; @@ -25,16 +25,23 @@ public class HistoryServiceFlinkJob { final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> serde = - new FlinkMonitoringRecordSerde<>(inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + /* + * final DeserializationSchema<ActivePowerRecord> serde = new + * FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class, + * ActivePowerRecordFactory.class); + */ + + final DeserializationSchema<ActivePowerRecord> serde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties index eee4ea94a..905e501b8 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties @@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 + num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java index b87f9b840..72e07ffee 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java @@ -14,6 +14,8 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index b26b8d60e..4ae68be71 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -4,11 +4,16 @@ import com.google.common.math.Stats; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; +import org.apache.flink.formats.avro.typeutils.AvroSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -18,15 +23,15 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import theodolite.commons.flink.serialization.StatsSerializer; +import theodolite.uc2.application.ConfigurationKeys; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.io.IOException; import java.util.Properties; @@ -49,6 +54,7 @@ public class HistoryServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); @@ -58,12 +64,11 @@ public class HistoryServiceFlinkJob { final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde = - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + + final DeserializationSchema<ActivePowerRecord> sourceSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); @@ -73,7 +78,7 @@ public class HistoryServiceFlinkJob { kafkaSource.setCommitOffsetsOnCheckpoints(true); kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); - final FlinkKafkaKeyValueSerde<String, String> sinkSerde = + final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde = new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, @@ -103,11 +108,6 @@ public class HistoryServiceFlinkJob { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); } - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java index b5bc50467..5c90d9bb7 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java @@ -4,7 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import theodolite.uc2.application.util.StatsFactory; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties index 171eec7cf..f97139098 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java index 1604ec73e..00e920289 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java @@ -14,6 +14,8 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index c3f4fd21d..900c37514 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -4,11 +4,13 @@ import com.google.common.math.Stats; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -22,15 +24,14 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import theodolite.commons.flink.serialization.StatsSerializer; +import theodolite.uc3.application.ConfigurationKeys; import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKeyFactory; import theodolite.uc3.application.util.HourOfDayKeySerde; import theodolite.uc3.application.util.StatsKeyFactory; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.io.IOException; import java.time.Instant; @@ -57,6 +58,7 @@ public class HistoryServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); final ZoneId timeZone = ZoneId.of(timeZoneString); final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); @@ -71,12 +73,11 @@ public class HistoryServiceFlinkJob { kafkaProps.setProperty("group.id", applicationId); // Sources and Sinks with Serializer and Deserializer - - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde = - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + + final DeserializationSchema<ActivePowerRecord> sourceSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); @@ -120,11 +121,6 @@ public class HistoryServiceFlinkJob { // Kryo serializer registration env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java index 65af4f451..277f92b50 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java @@ -4,7 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import theodolite.uc3.application.util.StatsFactory; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. @@ -36,4 +36,4 @@ public class StatsAggregateFunction implements AggregateFunction<ActivePowerReco statsAccumulator.addAll(b); return statsAccumulator.snapshot(); } -} \ No newline at end of file +} diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties index 4687602c2..6b6874674 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 aggregation.duration.days=30 aggregation.advance.days=1 num.threads=1 diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index e56e67573..bb835eddb 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -1,13 +1,17 @@ package theodolite.uc4.application; +import org.apache.avro.specific.SpecificRecord; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -17,25 +21,24 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; +import theodolite.uc4.application.ConfigurationKeys; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.SensorParentKey; import theodolite.uc4.application.util.SensorParentKeySerializer; import titan.ccp.common.configuration.Configurations; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; import java.io.IOException; import java.time.Duration; @@ -60,6 +63,7 @@ public class AggregationServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final Time windowSize = Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); final Duration windowGrace = Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); @@ -75,11 +79,11 @@ public class AggregationServiceFlinkJob { // Sources and Sinks with Serializer and Deserializer - // Source from input topic with ActivePowerRecords - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> inputSerde = - new FlinkMonitoringRecordSerde<>(inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + // Source from input topic with ActivePowerRecords + final DeserializationSchema<ActivePowerRecord> inputSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>( inputTopic, inputSerde, kafkaProps); @@ -90,10 +94,10 @@ public class AggregationServiceFlinkJob { } // Source from output topic with AggregatedPowerRecords - final FlinkMonitoringRecordSerde<AggregatedActivePowerRecord, AggregatedActivePowerRecordFactory> outputSerde = - new FlinkMonitoringRecordSerde<>(inputTopic, - AggregatedActivePowerRecord.class, - AggregatedActivePowerRecordFactory.class); + final DeserializationSchema<AggregatedActivePowerRecord> outputSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + AggregatedActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = new FlinkKafkaConsumer<>( outputTopic, outputSerde, kafkaProps); @@ -124,7 +128,7 @@ public class AggregationServiceFlinkJob { new FlinkKafkaKeyValueSerde<>( outputTopic, Serdes::String, - () -> IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()), + () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() { })); @@ -161,12 +165,6 @@ public class AggregationServiceFlinkJob { } // Kryo serializer registration - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); - env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer()); env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, new SensorParentKeySerializer()); @@ -298,8 +296,6 @@ public class AggregationServiceFlinkJob { "AggregatedActivePowerRecord { " + "identifier: " + value.getIdentifier() + ", " + "timestamp: " + value.getTimestamp() + ", " - + "minInW: " + value.getMinInW() + ", " - + "maxInW: " + value.getMaxInW() + ", " + "count: " + value.getCount() + ", " + "sumInW: " + value.getSumInW() + ", " + "avgInW: " + value.getAverageInW() + " }"; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java index a5be34aeb..d0682a0fa 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java @@ -15,6 +15,8 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String WINDOW_SIZE_MS = "window.size.ms"; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java index bd335132b..ceeb46e64 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java @@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Set; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java index ad24310b0..8e4f965b4 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java @@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Set; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java index db0a47721..ea21899f6 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java @@ -12,8 +12,8 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { @@ -46,7 +46,7 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio for (Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { AggregatedActivePowerRecord currentAggregate = this.aggregateState.value(); if (currentAggregate == null) { - currentAggregate = new AggregatedActivePowerRecord(key, 0, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, 0, 0, 0); + currentAggregate = new AggregatedActivePowerRecord(key, 0L, 0L, 0.0, 0.0); this.aggregateState.update(currentAggregate); } long count = currentAggregate.getCount(); @@ -55,14 +55,14 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio ActivePowerRecord newRecord = t.f1; if (newRecord == null) { // sensor was deleted -> decrease count, set newRecord to zero count--; - newRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0); + newRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0L, 0.0); } // get last value of this record from state or create 0 valued record ActivePowerRecord previousRecord = this.lastValueState.get(sensorParentKey); if (previousRecord == null) { // sensor was added -> increase count count++; - previousRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0); + previousRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0L, 0.0); } // if incoming record is older than the last saved record, skip the record @@ -78,8 +78,6 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio AggregatedActivePowerRecord newAggregate = new AggregatedActivePowerRecord( sensorParentKey.getParent(), timestamp, - Math.min(currentAggregate.getMinInW(), newRecord.getValueInW()), - Math.max(currentAggregate.getMaxInW(), newRecord.getValueInW()), count, sumInW, avgInW diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties index 571a090c3..de85fdb88 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties @@ -8,6 +8,7 @@ configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 window.size.ms=1000 window.grace.ms=0 num.threads=1 -- GitLab