diff --git a/theodolite-benchmarks/build.gradle b/theodolite-benchmarks/build.gradle index c70ada70c414e9c4b99cbde96a2f224880ea9f36..68a06296fe8af44006141b06fc56cd1a94e8f221 100644 --- a/theodolite-benchmarks/build.gradle +++ b/theodolite-benchmarks/build.gradle @@ -87,36 +87,32 @@ configure(useCaseApplications) { // Dependencies for all Flink benchmarks (use case applications) configure(useCaseApplicationsFlink) { ext { - flinkVersion = '1.11.0' + flinkVersion = '1.12.0' scalaBinaryVersion = '2.12' } dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - compile('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { - changing = true - } - api 'net.kieker-monitoring:kieker:1.14'//-SNAPSHOT' - api 'net.sourceforge.teetime:teetime:3.0' - // TODO Upgrade to 0.1.0 + // Special version required because of https://issues.apache.org/jira/browse/FLINK-13703 + implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.apache.kafka:kafka-clients:2.2.0' implementation 'com.google.guava:guava:24.1-jre' - implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' compile project(':flink-commons') - compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0" + //compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0" compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}" compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.industrial-devops', name: 'titan-ccp-common', version: '0.0.3-SNAPSHOT' + implementation "org.apache.flink:flink-avro:${flinkVersion}" + implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}" compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_2.12', version: '1.11.1' + compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_${scalaBinaryVersion}', version: "${flinkVersion}" // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/flink-commons/build.gradle b/theodolite-benchmarks/flink-commons/build.gradle index 79956c43997e82a88d243de4a55d2a1dcf4aed77..26b3a68ff550d9246ce03c6db7471f739c63f0dc 100644 --- a/theodolite-benchmarks/flink-commons/build.gradle +++ b/theodolite-benchmarks/flink-commons/build.gradle @@ -1,15 +1,12 @@ ext { - flinkVersion = '1.11.0' + flinkVersion = '1.12.0' scalaBinaryVersion = '2.12' } dependencies { - api 'org.apache.kafka:kafka-clients:2.2.0' - // implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { - changing = true - // exclude group: 'net.kieker-monitoring', module: 'kieker' - } + // Special version required because of https://issues.apache.org/jira/browse/FLINK-13703 + implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:30.1-jre' compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java deleted file mode 100644 index 8bca461e4d6b75aa8340bbba3796c53ceacbb46d..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java +++ /dev/null @@ -1,126 +0,0 @@ -package theodolite.commons.flink.serialization; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import kieker.common.record.IMonitoringRecord; -import kieker.common.record.factory.IRecordFactory; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; - -/** - * This class wraps the serializer and deserializer implementations for {@link IMonitoringRecord} - * from {@link IMonitoringRecordSerde} - * into the Flink {@link DeserializationSchema} and {@link SerializationSchema} - * and Kryo {@link Serializer} interfaces. - * It is used for serialization to and from Kafka as well as internal serialization - * between Flink instances. - * This class is also itself serializable by Flink. - * @param <R> The specific record type that extends {@link IMonitoringRecord} - * @param <F> The specific record factory type that extends {@link IRecordFactory<R>} - */ -public class FlinkMonitoringRecordSerde<R extends IMonitoringRecord, F extends IRecordFactory<R>> - extends Serializer<R> - implements KafkaDeserializationSchema<R>, - KafkaSerializationSchema<R> { - - private static final long serialVersionUID = -5687951056995646212L; - - private final String topic; - private transient Serde<String> keySerde; - private transient Serde<R> serde; - - private final Class<R> recordClass; - private final Class<F> recordFactoryClass; - - /** - * Creates a new FlinkMonitoringRecordSerde. - * @param topic - * The Kafka topic to/from which to serialize/deserialize. - * @param recordClass - * The class of the serialized/deserialized record. - * @param recordFactoryClass - * The class of the factory for the serialized/deserialized record. - */ - public FlinkMonitoringRecordSerde(final String topic, - final Class<R> recordClass, - final Class<F> recordFactoryClass) { - this.topic = topic; - this.recordClass = recordClass; - this.recordFactoryClass = recordFactoryClass; - } - - @Override - public boolean isEndOfStream(final R nextElement) { - return false; - } - - @Override - public TypeInformation<R> getProducedType() { - return TypeExtractor.getForClass(recordClass); - } - - @Override - public R deserialize(ConsumerRecord<byte[], byte[]> record) { - ensureInitialized(); - return this.serde.deserializer().deserialize(this.topic, record.value()); - } - - @Override - public ProducerRecord<byte[], byte[]> serialize(R element, @Nullable Long timestamp) { - ensureInitialized(); - String identifier = null; - if (element instanceof ActivePowerRecord) { - identifier = ((ActivePowerRecord) element).getIdentifier(); - } - if (element instanceof AggregatedActivePowerRecord) { - identifier = ((AggregatedActivePowerRecord) element).getIdentifier(); - } - final byte[] key = this.keySerde.serializer().serialize(this.topic, identifier); - final byte[] value = this.serde.serializer().serialize(this.topic, element); - return new ProducerRecord<>(this.topic, key, value); - } - - private void ensureInitialized() { - if (this.keySerde == null || this.serde == null) { - try { - this.keySerde = Serdes.String(); - this.serde = IMonitoringRecordSerde.serde( - recordFactoryClass.getDeclaredConstructor().newInstance()); - } catch (NoSuchMethodException | InstantiationException | IllegalAccessException - | InvocationTargetException e) { - e.printStackTrace(); - } - } - } - - @Override - public void write(final Kryo kryo, final Output output, final R record) { - ensureInitialized(); - final byte[] data = this.serde.serializer().serialize(this.topic, record); - output.writeInt(data.length); - output.writeBytes(data); - } - - @Override - public R read(final Kryo kryo, final Input input, final Class<R> type) { - ensureInitialized(); - final int numBytes = input.readInt(); - return this.serde.deserializer().deserialize(this.topic, input.readBytes(numBytes)); - } -} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java index 3d1de40852a1c3907ac041fbbd1c17a870622405..ed961bab733a409dc07b1be7fa35562103c3e2f4 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java @@ -15,6 +15,8 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + public static final String CHECKPOINTING = "checkpointing"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 931436543bc66e0a295f31dbec6261d209a16f46..24a60f535c5a2bbbdb974f223706c65963778bef 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -1,13 +1,13 @@ package theodolite.uc1.application; import org.apache.commons.configuration2.Configuration; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Properties; @@ -25,16 +25,23 @@ public class HistoryServiceFlinkJob { final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> serde = - new FlinkMonitoringRecordSerde<>(inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + /* + * final DeserializationSchema<ActivePowerRecord> serde = new + * FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class, + * ActivePowerRecordFactory.class); + */ + + final DeserializationSchema<ActivePowerRecord> serde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties index eee4ea94a16b030a7dedd3daf37757d277fc4003..905e501b8cb66712f2b245470d96803987a9b93b 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties @@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 + num.threads=1 commit.interval.ms=1000 cache.max.bytes.buffering=-1 diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java index b87f9b8408dc55f8e293bf252117989e7d871687..72e07ffeec0ae02d0721384f06ab9052dcd82141 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java @@ -14,6 +14,8 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index b26b8d60ecb56aeff03c71ce200972e6243abc4d..4ae68be7160fb5f4d9bf41d869f33c7c1724153a 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -4,11 +4,16 @@ import com.google.common.math.Stats; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; +import org.apache.flink.formats.avro.typeutils.AvroSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -18,15 +23,15 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import theodolite.commons.flink.serialization.StatsSerializer; +import theodolite.uc2.application.ConfigurationKeys; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.io.IOException; import java.util.Properties; @@ -49,6 +54,7 @@ public class HistoryServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase(); final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend"); @@ -58,12 +64,11 @@ public class HistoryServiceFlinkJob { final Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", kafkaBroker); kafkaProps.setProperty("group.id", applicationId); - - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde = - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + + final DeserializationSchema<ActivePowerRecord> sourceSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); @@ -73,7 +78,7 @@ public class HistoryServiceFlinkJob { kafkaSource.setCommitOffsetsOnCheckpoints(true); kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); - final FlinkKafkaKeyValueSerde<String, String> sinkSerde = + final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde = new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, @@ -103,11 +108,6 @@ public class HistoryServiceFlinkJob { env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize)); } - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java index b5bc504677efc2fa83e44662efd0a92df239bdbd..5c90d9bb70c674b0ec59781d816fc4057c31a085 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java @@ -4,7 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import theodolite.uc2.application.util.StatsFactory; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties index 171eec7cfc7741956a27d0e1de112de01ab1f1ed..f971390984ee41be1fce54e62f4f43ee2b9c02da 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java index 1604ec73e0313f6c022985ffff969597baefd737..00e9202899fdabd4009ad10c0e357442a013e658 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java @@ -14,6 +14,8 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index c3f4fd21d10bc73421f16182e8f20fcc8988c47b..900c375148d2dc2e69285d72a5e0150e30346d4e 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -4,11 +4,13 @@ import com.google.common.math.Stats; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -22,15 +24,14 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; import theodolite.commons.flink.serialization.StatsSerializer; +import theodolite.uc3.application.ConfigurationKeys; import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKeyFactory; import theodolite.uc3.application.util.HourOfDayKeySerde; import theodolite.uc3.application.util.StatsKeyFactory; import titan.ccp.common.configuration.Configurations; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; import java.io.IOException; import java.time.Instant; @@ -57,6 +58,7 @@ public class HistoryServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); final ZoneId timeZone = ZoneId.of(timeZoneString); final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); @@ -71,12 +73,11 @@ public class HistoryServiceFlinkJob { kafkaProps.setProperty("group.id", applicationId); // Sources and Sinks with Serializer and Deserializer - - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde = - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + + final DeserializationSchema<ActivePowerRecord> sourceSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( inputTopic, sourceSerde, kafkaProps); @@ -120,11 +121,6 @@ public class HistoryServiceFlinkJob { // Kryo serializer registration env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) -> diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java index 65af4f4511e53fa44ae1ab43c2aaac48e5e251fa..277f92b5054aa41cae8953d62f0f6975e9ddda87 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java @@ -4,7 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import theodolite.uc3.application.util.StatsFactory; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}. @@ -36,4 +36,4 @@ public class StatsAggregateFunction implements AggregateFunction<ActivePowerReco statsAccumulator.addAll(b); return statsAccumulator.snapshot(); } -} \ No newline at end of file +} diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties index 4687602c2e2f52ba9dcb43d3b86850916e317261..6b6874674ce6a0abea73ea6d983c00c15deb8bb1 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties @@ -4,6 +4,7 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 aggregation.duration.days=30 aggregation.advance.days=1 num.threads=1 diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index e56e67573e0a6201b9a7145c597e60d0f8599267..bb835eddb3aacef9fe4253dd710c015649573d16 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -1,13 +1,17 @@ package theodolite.uc4.application; +import org.apache.avro.specific.SpecificRecord; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -17,25 +21,24 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; -import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde; +import theodolite.uc4.application.ConfigurationKeys; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.SensorParentKey; import theodolite.uc4.application.util.SensorParentKeySerializer; import titan.ccp.common.configuration.Configurations; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; import java.io.IOException; import java.time.Duration; @@ -60,6 +63,7 @@ public class AggregationServiceFlinkJob { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final Time windowSize = Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); final Duration windowGrace = Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); @@ -75,11 +79,11 @@ public class AggregationServiceFlinkJob { // Sources and Sinks with Serializer and Deserializer - // Source from input topic with ActivePowerRecords - final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> inputSerde = - new FlinkMonitoringRecordSerde<>(inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class); + // Source from input topic with ActivePowerRecords + final DeserializationSchema<ActivePowerRecord> inputSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + ActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>( inputTopic, inputSerde, kafkaProps); @@ -90,10 +94,10 @@ public class AggregationServiceFlinkJob { } // Source from output topic with AggregatedPowerRecords - final FlinkMonitoringRecordSerde<AggregatedActivePowerRecord, AggregatedActivePowerRecordFactory> outputSerde = - new FlinkMonitoringRecordSerde<>(inputTopic, - AggregatedActivePowerRecord.class, - AggregatedActivePowerRecordFactory.class); + final DeserializationSchema<AggregatedActivePowerRecord> outputSerde = + ConfluentRegistryAvroDeserializationSchema.forSpecific( + AggregatedActivePowerRecord.class, + schemaRegistryUrl); final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = new FlinkKafkaConsumer<>( outputTopic, outputSerde, kafkaProps); @@ -124,7 +128,7 @@ public class AggregationServiceFlinkJob { new FlinkKafkaKeyValueSerde<>( outputTopic, Serdes::String, - () -> IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()), + () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() { })); @@ -161,12 +165,6 @@ public class AggregationServiceFlinkJob { } // Kryo serializer registration - env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class, - new FlinkMonitoringRecordSerde<>( - inputTopic, - ActivePowerRecord.class, - ActivePowerRecordFactory.class)); - env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer()); env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, new SensorParentKeySerializer()); @@ -298,8 +296,6 @@ public class AggregationServiceFlinkJob { "AggregatedActivePowerRecord { " + "identifier: " + value.getIdentifier() + ", " + "timestamp: " + value.getTimestamp() + ", " - + "minInW: " + value.getMinInW() + ", " - + "maxInW: " + value.getMaxInW() + ", " + "count: " + value.getCount() + ", " + "sumInW: " + value.getSumInW() + ", " + "avgInW: " + value.getAverageInW() + " }"; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java index a5be34aeb611516b3691041af3fbec1bf004fdfb..d0682a0fa4c91c8bb5257db45f03f96ff95ad296 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java @@ -15,6 +15,8 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String WINDOW_SIZE_MS = "window.size.ms"; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java index bd335132b4ddeb9526311982e49388aefc4d5d37..ceeb46e640734525ce37acbd41dbe2137f046803 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java @@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Set; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java index ad24310b0439f9dcce1a79ff2c441fb6ce923d10..8e4f965b4b1d2e30dea60d7211e737616927b264 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java @@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; import java.util.Set; diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java index db0a47721e16062cd0675bde8e3d15cc8a124531..ea21899f656758b93f2032634671b7cd2b285c0e 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java @@ -12,8 +12,8 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import theodolite.uc4.application.util.SensorParentKey; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; public class RecordAggregationProcessWindowFunction extends ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { @@ -46,7 +46,7 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio for (Tuple2<SensorParentKey, ActivePowerRecord> t : elements) { AggregatedActivePowerRecord currentAggregate = this.aggregateState.value(); if (currentAggregate == null) { - currentAggregate = new AggregatedActivePowerRecord(key, 0, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, 0, 0, 0); + currentAggregate = new AggregatedActivePowerRecord(key, 0L, 0L, 0.0, 0.0); this.aggregateState.update(currentAggregate); } long count = currentAggregate.getCount(); @@ -55,14 +55,14 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio ActivePowerRecord newRecord = t.f1; if (newRecord == null) { // sensor was deleted -> decrease count, set newRecord to zero count--; - newRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0); + newRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0L, 0.0); } // get last value of this record from state or create 0 valued record ActivePowerRecord previousRecord = this.lastValueState.get(sensorParentKey); if (previousRecord == null) { // sensor was added -> increase count count++; - previousRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0, 0.0); + previousRecord = new ActivePowerRecord(sensorParentKey.getSensor(), 0L, 0.0); } // if incoming record is older than the last saved record, skip the record @@ -78,8 +78,6 @@ public class RecordAggregationProcessWindowFunction extends ProcessWindowFunctio AggregatedActivePowerRecord newAggregate = new AggregatedActivePowerRecord( sensorParentKey.getParent(), timestamp, - Math.min(currentAggregate.getMinInW(), newRecord.getValueInW()), - Math.max(currentAggregate.getMaxInW(), newRecord.getValueInW()), count, sumInW, avgInW diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties index 571a090c3e55bb38837e6f5ca3c0b8594279cacc..de85fdb88c0462edc9fba58409918470fcb8cb6c 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-application-flink/src/main/resources/META-INF/application.properties @@ -8,6 +8,7 @@ configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8081 window.size.ms=1000 window.grace.ms=0 num.threads=1