Skip to content
Snippets Groups Projects

Migrate Flink benchmark implementation

Merged Sören Henning requested to merge flink-benchmark-migration into master
1 file
+ 74
66
Compare changes
  • Side-by-side
  • Inline
package theodolite.uc4.application;
package theodolite.uc4.application; // NOPMD Imports required
import java.time.Duration;
import java.time.Duration;
import java.util.Set;
import java.util.Set;
@@ -43,29 +43,80 @@ public class AggregationServiceFlinkJob {
@@ -43,29 +43,80 @@ public class AggregationServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final Configuration config = ServiceConfigurations.createWithDefaults();
 
private final StreamExecutionEnvironment env;
 
private final String applicationId;
private void run() {
/**
// Configurations
* Create a new {@link AggregationServiceFlinkJob}.
 
*/
 
public AggregationServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
this.applicationId = applicationName + "-" + applicationVersion;
 
 
// Execution environment configuration
 
// org.apache.flink.configuration.Configuration conf = new
 
// org.apache.flink.configuration.Configuration();
 
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
// final StreamExecutionEnvironment env =
 
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
 
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 
this.configureEnv();
 
 
this.buildPipeline();
 
}
 
 
private void configureEnv() {
 
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
 
if (checkpointing) {
 
this.env.enableCheckpointing(commitIntervalMs);
 
}
 
 
// State Backend
 
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
 
this.env.setStateBackend(stateBackend);
 
 
this.configureSerializers();
 
}
 
 
private void configureSerializers() {
 
this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
 
new ImmutableSensorRegistrySerializer());
 
this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
 
new SensorParentKeySerializer());
 
 
this.env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
 
new ImmutableSetSerializer());
 
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
 
new ImmutableSetSerializer());
 
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
 
new ImmutableSetSerializer());
 
 
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
 
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
 
+ s.getSerializer().getClass().getName()));
 
}
 
 
private void buildPipeline() {
 
// Get configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
 
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final Time windowSize =
final Time windowSize =
Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
final Duration windowGrace =
final Duration windowGrace =
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
final String configurationTopic =
final String configurationTopic =
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
// Source from input topic with ActivePowerRecords
// Source from input topic with ActivePowerRecords
final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource =
final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource =
@@ -91,65 +142,28 @@ public class AggregationServiceFlinkJob {
@@ -91,65 +142,28 @@ public class AggregationServiceFlinkJob {
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
// Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration();
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
// State Backend
env.setStateBackend(stateBackend);
// Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer());
env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
new SensorParentKeySerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
// Streaming topology
// Build input stream
// Build input stream
final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource)
final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance()
.rebalance()
.map(r -> r)
.map(r -> r)
.name("[Map] Rebalance Forward");
.name("[Map] Rebalance Forward");
// Build aggregation stream
// Build aggregation stream
final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource)
final DataStream<ActivePowerRecord> aggregationsInputStream =
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
this.env.addSource(kafkaOutputSource)
.rebalance()
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
.rebalance()
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
 
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
// Merge input and aggregation streams
// Merge input and aggregation streams
final DataStream<ActivePowerRecord> mergedInputStream = inputStream
final DataStream<ActivePowerRecord> mergedInputStream = inputStream
.union(aggregationsInputStream);
.union(aggregationsInputStream);
if (debug) {
mergedInputStream.print();
}
// Build parent sensor stream from configuration stream
// Build parent sensor stream from configuration stream
final DataStream<Tuple2<String, Set<String>>> configurationsStream =
final DataStream<Tuple2<String, Set<String>>> configurationsStream =
env.addSource(kafkaConfigSource)
this.env.addSource(kafkaConfigSource)
.name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS
.name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS
.filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
.filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
@@ -166,12 +180,6 @@ public class AggregationServiceFlinkJob {
@@ -166,12 +180,6 @@ public class AggregationServiceFlinkJob {
.flatMap(new JoinAndDuplicateCoFlatMapFunction())
.flatMap(new JoinAndDuplicateCoFlatMapFunction())
.name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
.name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
if (debug) {
lastValueStream
.map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1)
.print();
}
final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream
final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream
.rebalance()
.rebalance()
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
@@ -186,18 +194,18 @@ public class AggregationServiceFlinkJob {
@@ -186,18 +194,18 @@ public class AggregationServiceFlinkJob {
.name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
.name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
.returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)))
.returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)))
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
 
}
// add stdout sink
/**
if (debug) {
* Start running this microservice.
aggregationStream.print();
*/
}
public void run() {
// Execution plan
// Execution plan
LOGGER.info("Execution plan: {}", env.getExecutionPlan());
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
// Execute Job
// Execute Job
try {
try {
env.execute(applicationId);
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
LOGGER.error("An error occured while running this job.", e);
}
}
Loading