diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index a539608e2e8273dc57792d9eeea0dfb3155b10fb..25e197c7c8ec8858f2dd8d0f96dfb30191cfbbe2 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -1,4 +1,4 @@ -package theodolite.uc4.application; +package theodolite.uc4.application; // NOPMD Imports required import java.time.Duration; import java.util.Set; @@ -43,29 +43,80 @@ public class AggregationServiceFlinkJob { private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); private final Configuration config = ServiceConfigurations.createWithDefaults(); + private final StreamExecutionEnvironment env; + private final String applicationId; - private void run() { - // Configurations + /** + * Create a new {@link AggregationServiceFlinkJob}. + */ + public AggregationServiceFlinkJob() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); - final String applicationId = applicationName + "-" + applicationVersion; + this.applicationId = applicationName + "-" + applicationVersion; + + // Execution environment configuration + // org.apache.flink.configuration.Configuration conf = new + // org.apache.flink.configuration.Configuration(); + // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + // final StreamExecutionEnvironment env = + // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + + this.configureEnv(); + + this.buildPipeline(); + } + + private void configureEnv() { + this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); + if (checkpointing) { + this.env.enableCheckpointing(commitIntervalMs); + } + + // State Backend + final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); + this.env.setStateBackend(stateBackend); + + this.configureSerializers(); + } + + private void configureSerializers() { + this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, + new ImmutableSensorRegistrySerializer()); + this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, + new SensorParentKeySerializer()); + + this.env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(), + new ImmutableSetSerializer()); + this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), + new ImmutableSetSerializer()); + this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS + new ImmutableSetSerializer()); + + this.env.getConfig().getRegisteredTypesWithKryoSerializers() + .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " + + s.getSerializer().getClass().getName())); + } + + private void buildPipeline() { + // Get configurations final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final Time windowSize = Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); final Duration windowGrace = Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); - final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); - final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( - applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); + this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl); // Source from input topic with ActivePowerRecords final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = @@ -91,65 +142,28 @@ public class AggregationServiceFlinkJob { () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); - // Execution environment configuration - // org.apache.flink.configuration.Configuration conf = new - // org.apache.flink.configuration.Configuration(); - // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - // final StreamExecutionEnvironment env = - // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - if (checkpointing) { - env.enableCheckpointing(commitIntervalMs); - } - - // State Backend - env.setStateBackend(stateBackend); - - // Kryo serializer registration - env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, - new ImmutableSensorRegistrySerializer()); - env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class, - new SensorParentKeySerializer()); - - env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(), - new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(), - new ImmutableSetSerializer()); - env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS - new ImmutableSetSerializer()); - - env.getConfig().getRegisteredTypesWithKryoSerializers() - .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " - + s.getSerializer().getClass().getName())); - - // Streaming topology - // Build input stream - final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource) + final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .rebalance() .map(r -> r) .name("[Map] Rebalance Forward"); // Build aggregation stream - final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource) - .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS - .rebalance() - .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) - .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); + final DataStream<ActivePowerRecord> aggregationsInputStream = + this.env.addSource(kafkaOutputSource) + .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS + .rebalance() + .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) + .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); // Merge input and aggregation streams final DataStream<ActivePowerRecord> mergedInputStream = inputStream .union(aggregationsInputStream); - if (debug) { - mergedInputStream.print(); - } // Build parent sensor stream from configuration stream final DataStream<Tuple2<String, Set<String>>> configurationsStream = - env.addSource(kafkaConfigSource) + this.env.addSource(kafkaConfigSource) .name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) @@ -166,12 +180,6 @@ public class AggregationServiceFlinkJob { .flatMap(new JoinAndDuplicateCoFlatMapFunction()) .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)"); - if (debug) { - lastValueStream - .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + t.f1) - .print(); - } - final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream .rebalance() .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace)) @@ -186,18 +194,18 @@ public class AggregationServiceFlinkJob { .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); + } - // add stdout sink - if (debug) { - aggregationStream.print(); - } - + /** + * Start running this microservice. + */ + public void run() { // Execution plan - LOGGER.info("Execution plan: {}", env.getExecutionPlan()); + LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); // Execute Job try { - env.execute(applicationId); + this.env.execute(this.applicationId); } catch (final Exception e) { // NOPMD Execution thrown by Flink LOGGER.error("An error occured while running this job.", e); }