Skip to content
Snippets Groups Projects
Commit 1e0811c3 authored by Sören Henning's avatar Sören Henning
Browse files

Clean up code

parent 7c6ca36e
No related branches found
No related tags found
1 merge request!307Align UC4 implementations among engines
Pipeline #10195 canceled
...@@ -36,16 +36,14 @@ public abstract class HazelcastJetService { ...@@ -36,16 +36,14 @@ public abstract class HazelcastJetService {
* build a new jet instance. * build a new jet instance.
*/ */
public HazelcastJetService(final Logger logger) { public HazelcastJetService(final Logger logger) {
this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString(); this.jobName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
this.kafkaBootstrapServer = this.config.getProperty( this.kafkaBootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString(); this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
this.schemaRegistryUrl =
this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.propsBuilder = this.propsBuilder =
new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName); new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName);
this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString(); this.kafkaInputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final JetInstanceBuilder jetInstance = new JetInstanceBuilder() final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY); .setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
......
...@@ -19,7 +19,6 @@ public class HistoryService extends HazelcastJetService { ...@@ -19,7 +19,6 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/** /**
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline * Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory. * factory.
...@@ -36,8 +35,7 @@ public class HistoryService extends HazelcastJetService { ...@@ -36,8 +35,7 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName(), StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic = final String kafkaOutputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration downsampleInterval = Duration.ofMinutes( final Duration downsampleInterval = Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
......
...@@ -35,16 +35,16 @@ public class HistoryService extends HazelcastJetService { ...@@ -35,16 +35,16 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic = final String kafkaOutputTopic =
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration windowSize = Duration.ofDays(Integer.parseInt( final Duration windowSize = Duration.ofDays(
this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt( final Duration hoppingSize = Duration.ofDays(
this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt( final Duration emitPeriod = Duration.ofSeconds(
this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); this.config.getInt(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS));
this.pipelineFactory = new Uc3PipelineFactory( this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps, kafkaProps,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment