Skip to content
Snippets Groups Projects
Commit 8c172f32 authored by Sören Henning's avatar Sören Henning
Browse files

Use new KafkaConnectorFactory in benchmarks

parent 6e3c43d3
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
package theodolite.uc1.application; package theodolite.uc1.application;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
...@@ -31,21 +29,11 @@ public class HistoryServiceFlinkJob { ...@@ -31,21 +29,11 @@ public class HistoryServiceFlinkJob {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties(); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
kafkaProps.setProperty("bootstrap.servers", kafkaBroker); applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
kafkaProps.setProperty("group.id", applicationId);
final DeserializationSchema<ActivePowerRecord> serde =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
ActivePowerRecord.class,
schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer = final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps); kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
kafkaConsumer.setStartFromGroupOffsets();
if (checkpointing) {
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (checkpointing) { if (checkpointing) {
......
package theodolite.uc2.application; package theodolite.uc2.application;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -15,12 +11,11 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo ...@@ -15,12 +11,11 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import theodolite.commons.flink.StateBackends; import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.commons.flink.serialization.StatsSerializer;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
...@@ -48,32 +43,17 @@ public class HistoryServiceFlinkJob { ...@@ -48,32 +43,17 @@ public class HistoryServiceFlinkJob {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final Properties kafkaProps = new Properties(); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
kafkaProps.setProperty("bootstrap.servers", kafkaBroker); applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
kafkaProps.setProperty("group.id", applicationId);
final DeserializationSchema<ActivePowerRecord> sourceSerde = final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
ConfluentRegistryAvroDeserializationSchema.forSpecific( kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
ActivePowerRecord.class,
schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>( final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink =
inputTopic, sourceSerde, kafkaProps); kafkaConnector.createProducer(outputTopic,
kafkaSource.setStartFromGroupOffsets();
if (checkpointing) {
kafkaSource.setCommitOffsetsOnCheckpoints(true);
}
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde =
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String, Serdes::String,
Serdes::String, Serdes::String,
Types.TUPLE(Types.STRING, Types.STRING)); Types.TUPLE(Types.STRING, Types.STRING));
kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary?
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaSink.setWriteTimestampToKafka(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......
...@@ -4,14 +4,10 @@ import com.google.common.math.Stats; ...@@ -4,14 +4,10 @@ import com.google.common.math.Stats;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -22,8 +18,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -22,8 +18,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import theodolite.commons.flink.StateBackends; import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.StatsSerializer; import theodolite.commons.flink.serialization.StatsSerializer;
import theodolite.uc3.application.util.HourOfDayKey; import theodolite.uc3.application.util.HourOfDayKey;
import theodolite.uc3.application.util.HourOfDayKeyFactory; import theodolite.uc3.application.util.HourOfDayKeyFactory;
...@@ -32,7 +28,6 @@ import theodolite.uc3.application.util.StatsKeyFactory; ...@@ -32,7 +28,6 @@ import theodolite.uc3.application.util.StatsKeyFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* The History microservice implemented as a Flink job. * The History microservice implemented as a Flink job.
*/ */
...@@ -52,45 +47,26 @@ public class HistoryServiceFlinkJob { ...@@ -52,45 +47,26 @@ public class HistoryServiceFlinkJob {
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE); final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE));
final ZoneId timeZone = ZoneId.of(timeZoneString);
final Time aggregationDuration = final Time aggregationDuration =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance = final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties(); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
kafkaProps.setProperty("bootstrap.servers", kafkaBroker); applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
kafkaProps.setProperty("group.id", applicationId);
// Sources and Sinks with Serializer and Deserializer
final DeserializationSchema<ActivePowerRecord> sourceSerde =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
ActivePowerRecord.class,
schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
inputTopic, sourceSerde, kafkaProps);
kafkaSource.setStartFromGroupOffsets(); // Sources and Sinks
if (checkpointing) { final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
kafkaSource.setCommitOffsetsOnCheckpoints(true); kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
} final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink =
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); kafkaConnector.createProducer(outputTopic,
final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String, Serdes::String,
Serdes::String, Serdes::String,
Types.TUPLE(Types.STRING, Types.STRING)); Types.TUPLE(Types.STRING, Types.STRING));
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaSink.setWriteTimestampToKafka(true);
// Execution environment configuration // Execution environment configuration
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......
package theodolite.uc4.application; package theodolite.uc4.application;
import java.time.Duration; import java.time.Duration;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
...@@ -23,8 +19,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; ...@@ -23,8 +19,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import theodolite.commons.flink.StateBackends; import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde; import theodolite.commons.flink.TupleType;
import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer; import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer;
import theodolite.uc4.application.util.ImmutableSetSerializer; import theodolite.uc4.application.util.ImmutableSetSerializer;
import theodolite.uc4.application.util.SensorParentKey; import theodolite.uc4.application.util.SensorParentKey;
...@@ -67,72 +64,33 @@ public class AggregationServiceFlinkJob { ...@@ -67,72 +64,33 @@ public class AggregationServiceFlinkJob {
final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true); final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties(); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
kafkaProps.setProperty("bootstrap.servers", kafkaBroker); applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
kafkaProps.setProperty("group.id", applicationId);
// Sources and Sinks with Serializer and Deserializer
// Source from input topic with ActivePowerRecords // Source from input topic with ActivePowerRecords
final DeserializationSchema<ActivePowerRecord> inputSerde = final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource =
ConfluentRegistryAvroDeserializationSchema.forSpecific( kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
ActivePowerRecord.class, // TODO Watermarks?
schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>(
inputTopic, inputSerde, kafkaProps);
kafkaInputSource.setStartFromGroupOffsets();
if (checkpointing) {
kafkaInputSource.setCommitOffsetsOnCheckpoints(true);
}
// Source from output topic with AggregatedPowerRecords // Source from output topic with AggregatedPowerRecords
final DeserializationSchema<AggregatedActivePowerRecord> outputSerde =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
AggregatedActivePowerRecord.class,
schemaRegistryUrl);
final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
new FlinkKafkaConsumer<>( kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class);
outputTopic, outputSerde, kafkaProps);
kafkaOutputSource.setStartFromGroupOffsets();
if (checkpointing) {
kafkaOutputSource.setCommitOffsetsOnCheckpoints(true);
}
// Source from configuration topic with EventSensorRegistry JSON final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource =
final FlinkKafkaKeyValueSerde<Event, String> configSerde = kafkaConnector.createConsumer(
new FlinkKafkaKeyValueSerde<>(
configurationTopic, configurationTopic,
EventSerde::serde, EventSerde::serde,
Serdes::String, Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<Event, String>>() {})); TupleType.of(TypeInformation.of(Event.class), Types.STRING));
final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = new FlinkKafkaConsumer<>(
configurationTopic, configSerde, kafkaProps);
kafkaConfigSource.setStartFromGroupOffsets();
if (checkpointing) {
kafkaConfigSource.setCommitOffsetsOnCheckpoints(true);
}
// Sink to output topic with SensorId, AggregatedActivePowerRecord // Sink to output topic with SensorId, AggregatedActivePowerRecord
final FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde = final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
new FlinkKafkaKeyValueSerde<>( kafkaConnector.createProducer(
outputTopic, outputTopic,
Serdes::String, Serdes::String,
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
new FlinkKafkaProducer<>(
outputTopic,
aggregationSerde,
kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaAggregationSink.setWriteTimestampToKafka(true);
// Execution environment configuration // Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new // org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration(); // org.apache.flink.configuration.Configuration();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment