Skip to content
Snippets Groups Projects
Commit d26e223e authored by Sören Henning's avatar Sören Henning
Browse files

Split code into multiple methods

parent 78931315
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
This commit is part of merge request !90. Comments created here will be created in the context of that merge request.
package theodolite.uc4.application; package theodolite.uc4.application; // NOPMD Imports required
import java.time.Duration; import java.time.Duration;
import java.util.Set; import java.util.Set;
...@@ -43,29 +43,80 @@ public class AggregationServiceFlinkJob { ...@@ -43,29 +43,80 @@ public class AggregationServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private void run() { /**
// Configurations * Create a new {@link AggregationServiceFlinkJob}.
*/
public AggregationServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion; this.applicationId = applicationName + "-" + applicationVersion;
// Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration();
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer());
this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
new SensorParentKeySerializer());
this.env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
new ImmutableSetSerializer());
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
new ImmutableSetSerializer());
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
}
private void buildPipeline() {
// Get configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final Time windowSize = final Time windowSize =
Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
final Duration windowGrace = final Duration windowGrace =
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
final String configurationTopic = final String configurationTopic =
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
// Source from input topic with ActivePowerRecords // Source from input topic with ActivePowerRecords
final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource =
...@@ -91,50 +142,16 @@ public class AggregationServiceFlinkJob { ...@@ -91,50 +142,16 @@ public class AggregationServiceFlinkJob {
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
// Execution environment configuration
// org.apache.flink.configuration.Configuration conf = new
// org.apache.flink.configuration.Configuration();
// conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
// State Backend
env.setStateBackend(stateBackend);
// Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer());
env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
new SensorParentKeySerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
new ImmutableSetSerializer());
env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
// Streaming topology
// Build input stream // Build input stream
final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource) final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance() .rebalance()
.map(r -> r) .map(r -> r)
.name("[Map] Rebalance Forward"); .name("[Map] Rebalance Forward");
// Build aggregation stream // Build aggregation stream
final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource) final DataStream<ActivePowerRecord> aggregationsInputStream =
this.env.addSource(kafkaOutputSource)
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.rebalance() .rebalance()
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
...@@ -144,12 +161,9 @@ public class AggregationServiceFlinkJob { ...@@ -144,12 +161,9 @@ public class AggregationServiceFlinkJob {
final DataStream<ActivePowerRecord> mergedInputStream = inputStream final DataStream<ActivePowerRecord> mergedInputStream = inputStream
.union(aggregationsInputStream); .union(aggregationsInputStream);
if (debug) {
mergedInputStream.print();
}
// Build parent sensor stream from configuration stream // Build parent sensor stream from configuration stream
final DataStream<Tuple2<String, Set<String>>> configurationsStream = final DataStream<Tuple2<String, Set<String>>> configurationsStream =
env.addSource(kafkaConfigSource) this.env.addSource(kafkaConfigSource)
.name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS .name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS
.filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
|| tuple.f0 == Event.SENSOR_REGISTRY_STATUS) || tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
...@@ -166,12 +180,6 @@ public class AggregationServiceFlinkJob { ...@@ -166,12 +180,6 @@ public class AggregationServiceFlinkJob {
.flatMap(new JoinAndDuplicateCoFlatMapFunction()) .flatMap(new JoinAndDuplicateCoFlatMapFunction())
.name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)"); .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
if (debug) {
lastValueStream
.map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1)
.print();
}
final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream
.rebalance() .rebalance()
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace)) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
...@@ -186,18 +194,18 @@ public class AggregationServiceFlinkJob { ...@@ -186,18 +194,18 @@ public class AggregationServiceFlinkJob {
.name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
.returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)))
.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
// add stdout sink
if (debug) {
aggregationStream.print();
} }
/**
* Start running this microservice.
*/
public void run() {
// Execution plan // Execution plan
LOGGER.info("Execution plan: {}", env.getExecutionPlan()); LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
// Execute Job // Execute Job
try { try {
env.execute(applicationId); this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink } catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e); LOGGER.error("An error occured while running this job.", e);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment