Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (8)
......@@ -36,16 +36,14 @@ public abstract class HazelcastJetService {
* build a new jet instance.
*/
public HazelcastJetService(final Logger logger) {
this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
this.jobName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
this.kafkaBootstrapServer = this.config.getProperty(
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
this.schemaRegistryUrl =
this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.kafkaBootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
this.propsBuilder =
new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName);
this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
this.kafkaInputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
......
......@@ -19,7 +19,6 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory.
......@@ -36,8 +35,7 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final String kafkaOutputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration downsampleInterval = Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
......
......@@ -35,16 +35,16 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration windowSize = Duration.ofDays(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
final Duration windowSize = Duration.ofDays(
this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
final Duration hoppingSize = Duration.ofDays(
this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString()));
final Duration emitPeriod = Duration.ofSeconds(
this.config.getInt(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS));
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
......