Skip to content
Snippets Groups Projects

Migrate Flink benchmark implementation

Merged Sören Henning requested to merge flink-benchmark-migration into master
1 file
+ 14
15
Compare changes
  • Side-by-side
  • Inline
package theodolite.uc1.application;
package theodolite.uc1.application;
 
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import titan.ccp.common.configuration.Configurations;
import org.slf4j.Logger;
 
import org.slf4j.LoggerFactory;
 
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
import java.util.Properties;
/**
/**
* The History Microservice Flink Job.
* The History microservice implemented as a Flink job.
*/
*/
public class HistoryServiceFlinkJob {
public class HistoryServiceFlinkJob {
private final Configuration config = Configurations.create();
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
 
private final Configuration config = ServiceConfigurations.createWithDefaults();
private void run() {
private void run() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
@@ -32,12 +35,6 @@ public class HistoryServiceFlinkJob {
@@ -32,12 +35,6 @@ public class HistoryServiceFlinkJob {
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("group.id", applicationId);
kafkaProps.setProperty("group.id", applicationId);
/*
* final DeserializationSchema<ActivePowerRecord> serde = new
* FlinkMonitoringRecordSerde<>(inputTopic, ActivePowerRecord.class,
* ActivePowerRecordFactory.class);
*/
final DeserializationSchema<ActivePowerRecord> serde =
final DeserializationSchema<ActivePowerRecord> serde =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
ConfluentRegistryAvroDeserializationSchema.forSpecific(
ActivePowerRecord.class,
ActivePowerRecord.class,
@@ -46,12 +43,14 @@ public class HistoryServiceFlinkJob {
@@ -46,12 +43,14 @@ public class HistoryServiceFlinkJob {
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
kafkaConsumer.setStartFromGroupOffsets();
kafkaConsumer.setStartFromGroupOffsets();
if (checkpointing)
if (checkpointing) {
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
 
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (checkpointing)
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
env.enableCheckpointing(commitIntervalMs);
 
}
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
@@ -65,8 +64,8 @@ public class HistoryServiceFlinkJob {
@@ -65,8 +64,8 @@ public class HistoryServiceFlinkJob {
try {
try {
env.execute(applicationId);
env.execute(applicationId);
} catch (Exception e) {
} catch (final Exception e) { // NOPMD Execution thrown by Flink
e.printStackTrace();
LOGGER.error("An error occured while running this job.", e);
}
}
}
}
Loading