Skip to content
Snippets Groups Projects
Commit 50e37cb0 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'main' into code-cleanup

parents c441517f cbf172a2
No related branches found
No related tags found
No related merge requests found
Pipeline #10200 failed
...@@ -36,16 +36,14 @@ public abstract class HazelcastJetService { ...@@ -36,16 +36,14 @@ public abstract class HazelcastJetService {
* build a new jet instance. * build a new jet instance.
*/ */
public HazelcastJetService(final Logger logger) { public HazelcastJetService(final Logger logger) {
this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString(); this.jobName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
this.kafkaBootstrapServer = this.config.getProperty( this.kafkaBootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString(); this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
this.schemaRegistryUrl =
this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.propsBuilder = this.propsBuilder =
new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName); new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName);
this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString(); this.kafkaInputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final JetInstanceBuilder jetInstance = new JetInstanceBuilder() final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY); .setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
... ...
......
...@@ -18,7 +18,6 @@ public class HistoryService extends HazelcastJetService { ...@@ -18,7 +18,6 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/** /**
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline * Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory. * factory.
... ...
......
...@@ -34,16 +34,16 @@ public class HistoryService extends HazelcastJetService { ...@@ -34,16 +34,16 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic = final String kafkaOutputTopic =
this.config.getProperty(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration windowSize = Duration.ofDays(Integer.parseInt( final Duration windowSize = Duration.ofDays(
this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt( final Duration hoppingSize = Duration.ofDays(
this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt( final Duration emitPeriod = Duration.ofSeconds(
this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS));
this.pipelineFactory = new Uc3PipelineFactory( this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps, kafkaProps,
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment