Skip to content
Snippets Groups Projects
Commit dbaf2cbb authored by Sören Henning's avatar Sören Henning
Browse files

Further refactoring for complexity reduction

parent fcc123c7
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2292 canceled
......@@ -18,29 +18,45 @@ public class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private void run() {
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public final HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
}
private void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream
.rebalance()
......@@ -49,9 +65,14 @@ public class HistoryServiceFlinkJob {
+ "timestamp: " + v.getTimestamp() + ", "
+ "valueInW: " + v.getValueInW() + " }")
.print();
}
/**
* Start running this microservice.
*/
public void run() {
try {
env.execute(applicationId);
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
......
......@@ -24,27 +24,63 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public class HistoryServiceFlinkJob {
public final class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
private void run() {
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
}
private void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
......@@ -55,22 +91,7 @@ public class HistoryServiceFlinkJob {
Serdes::String,
Types.TUPLE(Types.STRING, Types.STRING));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing) {
env.enableCheckpointing(commitIntervalMs);
}
// State Backend
env.setStateBackend(stateBackend);
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
env
this.env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
.keyBy(ActivePowerRecord::getIdentifier)
......@@ -83,11 +104,17 @@ public class HistoryServiceFlinkJob {
return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
}
LOGGER.info("Execution plan: {}", env.getExecutionPlan());
/**
* Start running this microservice.
*/
public void run() {
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try {
env.execute(applicationId);
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
......
......@@ -38,7 +38,7 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* The Aggregation microservice implemented as a Flink job.
*/
public class AggregationServiceFlinkJob {
public final class AggregationServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment