Skip to content
Snippets Groups Projects
Commit 5d7d892e authored by Sören Henning's avatar Sören Henning
Browse files

Improve code quality

parent 9835b0cc
Branches
Tags
No related merge requests found
Pipeline #10155 passed
...@@ -60,7 +60,7 @@ public abstract class AbstractFlinkService { ...@@ -60,7 +60,7 @@ public abstract class AbstractFlinkService {
protected void configureCheckpointing() { protected void configureCheckpointing() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.CHECKPOINTING_INTERVAL_MS);
LOGGER.info("Set parallelism to: {}.", checkpointing); LOGGER.info("Set parallelism to: {}.", checkpointing);
if (checkpointing) { if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
... ...
......
...@@ -23,7 +23,9 @@ public final class ConfigurationKeys { ...@@ -23,7 +23,9 @@ public final class ConfigurationKeys {
public static final String WINDOW_GRACE_MS = "window.grace.ms"; public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; public static final String CHECKPOINTING = "checkpointing";
public static final String CHECKPOINTING_INTERVAL_MS = "commit.interval.ms";
public static final String FLINK_STATE_BACKEND = "flink.state.backend"; public static final String FLINK_STATE_BACKEND = "flink.state.backend";
...@@ -34,8 +36,6 @@ public final class ConfigurationKeys { ...@@ -34,8 +36,6 @@ public final class ConfigurationKeys {
public static final String DEBUG = "debug"; public static final String DEBUG = "debug";
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism"; public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
... ...
......
...@@ -8,7 +8,8 @@ import com.google.common.math.Stats; ...@@ -8,7 +8,8 @@ import com.google.common.math.Stats;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Custom Kryo {@link Serializer} for efficient transmission between Flink instances. * Custom Kryo {@link Serializer}for {@link Stats} objects for efficient transmission between Flink
* instances.
*/ */
public class StatsSerializer extends Serializer<Stats> implements Serializable { public class StatsSerializer extends Serializer<Stats> implements Serializable {
... ...
......
...@@ -9,9 +9,8 @@ import org.slf4j.LoggerFactory; ...@@ -9,9 +9,8 @@ import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* Abstract HazelcastJetService. * Abstract HazelcastJetService. Holds common fields and logic shared for all hazelcast jet
* Holds common fields and logic shared for all hazelcast jet services. * services. Set common settings and initiates a hazelcast jet instance.
* Set common settings and initiates a hazelcast jet instance.
*/ */
public abstract class HazelcastJetService { public abstract class HazelcastJetService {
...@@ -33,39 +32,38 @@ public abstract class HazelcastJetService { ...@@ -33,39 +32,38 @@ public abstract class HazelcastJetService {
/** /**
* Instantiate a new abstract service. * Instantiate a new abstract service. Retrieves needed fields using ServiceConfiguration and
* Retrieves needed fields using ServiceConfiguration and build a new jet instance. * build a new jet instance.
*/ */
public HazelcastJetService(final Logger logger) { public HazelcastJetService(final Logger logger) {
this.jobName = config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString(); this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
this.kafkaBootstrapServer = config.getProperty( this.kafkaBootstrapServer = this.config.getProperty(
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString(); ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
this.schemaRegistryUrl = config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString(); this.schemaRegistryUrl =
this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.propsBuilder = this.propsBuilder =
new KafkaPropertiesBuilder(kafkaBootstrapServer, schemaRegistryUrl, jobName); new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName);
this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString(); this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
final JetInstanceBuilder jetInstance = new JetInstanceBuilder() final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY); .setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
this.jetInstance = jetInstance.build(); this.jetInstance = jetInstance.build();
} }
/** /**
* Constructs and starts the pipeline. * Constructs and starts the pipeline. First, initiates a pipeline. Second, register the
* First initiates a pipeline, * corresponding serializers. Third, set the job name. Lastly, add the job to the Hazelcast
* Second register the corresponding serializers, * instance.
* Third set the job name,
* Lastly, add the job to the hazelcast instance.
*/ */
public void run() { public void run() {
try { try {
final Pipeline pipeline = pipelineFactory.buildPipeline(); final Pipeline pipeline = this.pipelineFactory.buildPipeline();
registerSerializer(); this.registerSerializer();
jobConfig.setName(config.getString("name")); this.jobConfig.setName(this.config.getString("name"));
this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join(); this.jetInstance.newJobIfAbsent(pipeline, this.jobConfig).join();
} catch (final Exception e) { // NOPMD } catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!:", e); LOGGER.error("ABORT MISSION!:", e);
} }
... ...
......
...@@ -42,17 +42,18 @@ public class JetInstanceBuilder { ...@@ -42,17 +42,18 @@ public class JetInstanceBuilder {
} }
/** /**
* Builds and returns a JetInstance. If a config is set, the JetInstance will contain the set * Builds and returns a {@link JetInstance}. If a config is set, the {@link JetInstance} will
* config. * contain the set config.
* *
* @return JetInstance * @return JetInstance
*/ */
public JetInstance build() { public JetInstance build() {
final JetInstance jet = Jet.newJetInstance(); final JetInstance jet = Jet.newJetInstance();
if (this.config == null) { final Config localConfig = this.config;
if (localConfig == null) {
return jet; return jet;
} else { } else {
jet.getConfig().setHazelcastConfig(this.config); jet.getConfig().setHazelcastConfig(localConfig);
return jet; return jet;
} }
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment