diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 4778acde357653d07a33f43b4ff249b0d20233ad..10546b515a645c6f0c7033b4166012b3bc587fca 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -1,14 +1,12 @@ package theodolite.uc1.application; -import java.util.Properties; import org.apache.commons.configuration2.Configuration; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.flink.KafkaConnectorFactory; import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; @@ -31,21 +29,11 @@ public class HistoryServiceFlinkJob { final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final Properties kafkaProps = new Properties(); - kafkaProps.setProperty("bootstrap.servers", kafkaBroker); - kafkaProps.setProperty("group.id", applicationId); - - final DeserializationSchema<ActivePowerRecord> serde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); + final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( + applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = - new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); - kafkaConsumer.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - } + kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); if (checkpointing) { diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index 4e75c3a8de1f3b25b00822bdec106c8b76b43af7..1a7e6a2dd20b4dfbbde841a84829a144d514cc55 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -1,13 +1,9 @@ package theodolite.uc2.application; import com.google.common.math.Stats; -import java.util.Properties; import org.apache.commons.configuration2.Configuration; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -15,12 +11,11 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.flink.KafkaConnectorFactory; import theodolite.commons.flink.StateBackends; -import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.StatsSerializer; import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; @@ -48,32 +43,17 @@ public class HistoryServiceFlinkJob { final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); - final Properties kafkaProps = new Properties(); - kafkaProps.setProperty("bootstrap.servers", kafkaBroker); - kafkaProps.setProperty("group.id", applicationId); + final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( + applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); - final DeserializationSchema<ActivePowerRecord> sourceSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); + final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = + kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); - final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( - inputTopic, sourceSerde, kafkaProps); - kafkaSource.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaSource.setCommitOffsetsOnCheckpoints(true); - } - kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); - - final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde = - new FlinkKafkaKeyValueSerde<>(outputTopic, + final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = + kafkaConnector.createProducer(outputTopic, Serdes::String, Serdes::String, Types.TUPLE(Types.STRING, Types.STRING)); - kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary? - final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( - outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); - kafkaSink.setWriteTimestampToKafka(true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index c788d691f3a9819fe6ded847cb2ab4ae4b718b61..cd00001218181e64fb015ea965a773e26c8184a2 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -4,14 +4,10 @@ import com.google.common.math.Stats; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Properties; import org.apache.commons.configuration2.Configuration; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -22,8 +18,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.flink.KafkaConnectorFactory; import theodolite.commons.flink.StateBackends; -import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKeyFactory; @@ -32,7 +28,6 @@ import theodolite.uc3.application.util.StatsKeyFactory; import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; - /** * The History microservice implemented as a Flink job. */ @@ -52,45 +47,26 @@ public class HistoryServiceFlinkJob { final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); - final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); - final ZoneId timeZone = ZoneId.of(timeZoneString); + final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE)); final Time aggregationDuration = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Time aggregationAdvance = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final Properties kafkaProps = new Properties(); - kafkaProps.setProperty("bootstrap.servers", kafkaBroker); - kafkaProps.setProperty("group.id", applicationId); - - // Sources and Sinks with Serializer and Deserializer - - final DeserializationSchema<ActivePowerRecord> sourceSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); - - final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( - inputTopic, sourceSerde, kafkaProps); + final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( + applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); - kafkaSource.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaSource.setCommitOffsetsOnCheckpoints(true); - } - kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); - - final FlinkKafkaKeyValueSerde<String, String> sinkSerde = - new FlinkKafkaKeyValueSerde<>(outputTopic, + // Sources and Sinks + final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = + kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); + final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = + kafkaConnector.createProducer(outputTopic, Serdes::String, Serdes::String, Types.TUPLE(Types.STRING, Types.STRING)); - final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( - outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); - kafkaSink.setWriteTimestampToKafka(true); - // Execution environment configuration final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 9ac16549b0d69942c962f080c4a9ff6d601b42e9..a539608e2e8273dc57792d9eeea0dfb3155b10fb 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -1,17 +1,13 @@ package theodolite.uc4.application; import java.time.Duration; -import java.util.Properties; import java.util.Set; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -23,8 +19,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.flink.KafkaConnectorFactory; import theodolite.commons.flink.StateBackends; -import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; +import theodolite.commons.flink.TupleType; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.SensorParentKey; @@ -67,72 +64,33 @@ public class AggregationServiceFlinkJob { final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final Properties kafkaProps = new Properties(); - kafkaProps.setProperty("bootstrap.servers", kafkaBroker); - kafkaProps.setProperty("group.id", applicationId); - - // Sources and Sinks with Serializer and Deserializer + final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( + applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); // Source from input topic with ActivePowerRecords - final DeserializationSchema<ActivePowerRecord> inputSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - ActivePowerRecord.class, - schemaRegistryUrl); - - final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>( - inputTopic, inputSerde, kafkaProps); - - kafkaInputSource.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaInputSource.setCommitOffsetsOnCheckpoints(true); - } + final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = + kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); + // TODO Watermarks? // Source from output topic with AggregatedPowerRecords - final DeserializationSchema<AggregatedActivePowerRecord> outputSerde = - ConfluentRegistryAvroDeserializationSchema.forSpecific( - AggregatedActivePowerRecord.class, - schemaRegistryUrl); - final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = - new FlinkKafkaConsumer<>( - outputTopic, outputSerde, kafkaProps); - - kafkaOutputSource.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaOutputSource.setCommitOffsetsOnCheckpoints(true); - } + kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); - // Source from configuration topic with EventSensorRegistry JSON - final FlinkKafkaKeyValueSerde<Event, String> configSerde = - new FlinkKafkaKeyValueSerde<>( + final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = + kafkaConnector.createConsumer( configurationTopic, EventSerde::serde, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<Event, String>>() {})); - - final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = new FlinkKafkaConsumer<>( - configurationTopic, configSerde, kafkaProps); - kafkaConfigSource.setStartFromGroupOffsets(); - if (checkpointing) { - kafkaConfigSource.setCommitOffsetsOnCheckpoints(true); - } + TupleType.of(TypeInformation.of(Event.class), Types.STRING)); // Sink to output topic with SensorId, AggregatedActivePowerRecord - final FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde = - new FlinkKafkaKeyValueSerde<>( + final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = + kafkaConnector.createProducer( outputTopic, Serdes::String, () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); - final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = - new FlinkKafkaProducer<>( - outputTopic, - aggregationSerde, - kafkaProps, - FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); - kafkaAggregationSink.setWriteTimestampToKafka(true); - // Execution environment configuration // org.apache.flink.configuration.Configuration conf = new // org.apache.flink.configuration.Configuration();