diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java index f348543cd9897bc3abf1871ce828c22ea531dd4c..662692c732cf65965dc689be5ded8125b4488233 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java @@ -60,7 +60,7 @@ public abstract class AbstractFlinkService { protected void configureCheckpointing() { final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); - final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); + final int commitIntervalMs = this.config.getInt(ConfigurationKeys.CHECKPOINTING_INTERVAL_MS); LOGGER.info("Set parallelism to: {}.", checkpointing); if (checkpointing) { this.env.enableCheckpointing(commitIntervalMs); diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java index 9eb143c3c07f879de37eafa2fbe6729bf182d45e..cbe2e993d66d1824117169e7986311b2663038ee 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java @@ -23,7 +23,9 @@ public final class ConfigurationKeys { public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + public static final String CHECKPOINTING = "checkpointing"; + + public static final String CHECKPOINTING_INTERVAL_MS = "commit.interval.ms"; public static final String FLINK_STATE_BACKEND = "flink.state.backend"; @@ -34,8 +36,6 @@ public final class ConfigurationKeys { public static final String DEBUG = "debug"; - public static final String CHECKPOINTING = "checkpointing"; - public static final String PARALLELISM = "parallelism"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/serialization/StatsSerializer.java index 881f5870a4dda3085d1391aea016f61018627029..4afd05e938d3de264a6c07e156229df02bebda37 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/serialization/StatsSerializer.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/serialization/StatsSerializer.java @@ -8,7 +8,8 @@ import com.google.common.math.Stats; import java.io.Serializable; /** - * Custom Kryo {@link Serializer} for efficient transmission between Flink instances. + * Custom Kryo {@link Serializer}for {@link Stats} objects for efficient transmission between Flink + * instances. */ public class StatsSerializer extends Serializer<Stats> implements Serializable { diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java index 6e1a09f08f671cd8155c5fd68956254a1e5d4fb1..6c9beb671c3b78d244f73f278de2a1c623a1c6f8 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java @@ -9,9 +9,8 @@ import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** - * Abstract HazelcastJetService. - * Holds common fields and logic shared for all hazelcast jet services. - * Set common settings and initiates a hazelcast jet instance. + * Abstract HazelcastJetService. Holds common fields and logic shared for all hazelcast jet + * services. Set common settings and initiates a hazelcast jet instance. */ public abstract class HazelcastJetService { @@ -33,39 +32,38 @@ public abstract class HazelcastJetService { /** - * Instantiate a new abstract service. - * Retrieves needed fields using ServiceConfiguration and build a new jet instance. + * Instantiate a new abstract service. Retrieves needed fields using ServiceConfiguration and + * build a new jet instance. */ public HazelcastJetService(final Logger logger) { - this.jobName = config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString(); + this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString(); - this.kafkaBootstrapServer = config.getProperty( + this.kafkaBootstrapServer = this.config.getProperty( ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString(); - this.schemaRegistryUrl = config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString(); + this.schemaRegistryUrl = + this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString(); this.propsBuilder = - new KafkaPropertiesBuilder(kafkaBootstrapServer, schemaRegistryUrl, jobName); + new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName); - this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString(); + this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString(); final JetInstanceBuilder jetInstance = new JetInstanceBuilder() - .setConfigFromEnv(logger, kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY); + .setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY); this.jetInstance = jetInstance.build(); } /** - * Constructs and starts the pipeline. - * First initiates a pipeline, - * Second register the corresponding serializers, - * Third set the job name, - * Lastly, add the job to the hazelcast instance. + * Constructs and starts the pipeline. First, initiates a pipeline. Second, register the + * corresponding serializers. Third, set the job name. Lastly, add the job to the Hazelcast + * instance. */ public void run() { try { - final Pipeline pipeline = pipelineFactory.buildPipeline(); - registerSerializer(); - jobConfig.setName(config.getString("name")); - this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join(); + final Pipeline pipeline = this.pipelineFactory.buildPipeline(); + this.registerSerializer(); + this.jobConfig.setName(this.config.getString("name")); + this.jetInstance.newJobIfAbsent(pipeline, this.jobConfig).join(); } catch (final Exception e) { // NOPMD LOGGER.error("ABORT MISSION!:", e); } diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/JetInstanceBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/JetInstanceBuilder.java index cc2ee052d5e2ed7e7b372baf7b59f24ef3e26e8f..3e78518a25abf5b2302c72d712718c1793fd44a5 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/JetInstanceBuilder.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/JetInstanceBuilder.java @@ -42,17 +42,18 @@ public class JetInstanceBuilder { } /** - * Builds and returns a JetInstance. If a config is set, the JetInstance will contain the set - * config. + * Builds and returns a {@link JetInstance}. If a config is set, the {@link JetInstance} will + * contain the set config. * * @return JetInstance */ public JetInstance build() { final JetInstance jet = Jet.newJetInstance(); - if (this.config == null) { + final Config localConfig = this.config; + if (localConfig == null) { return jet; } else { - jet.getConfig().setHazelcastConfig(this.config); + jet.getConfig().setHazelcastConfig(localConfig); return jet; }