Skip to content
Snippets Groups Projects
Commit 5db4cce2 authored by Sören Henning's avatar Sören Henning
Browse files

Fix config capitalization

parent 15128132
No related branches found
No related tags found
No related merge requests found
Showing
with 23 additions and 24 deletions
...@@ -48,10 +48,10 @@ public abstract class AbstractPipelineFactory { ...@@ -48,10 +48,10 @@ public abstract class AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put( consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put( consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put( consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
......
...@@ -46,7 +46,7 @@ public class BeamService { ...@@ -46,7 +46,7 @@ public class BeamService {
* Start this microservice, by running the underlying Beam pipeline. * Start this microservice, by running the underlying Beam pipeline.
*/ */
public void run() { public void run() {
LOGGER.info("Construct Beam pipeline with pipeline options: {}", LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString()); this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName); LOGGER.info("Starting BeamService {}.", this.applicationName);
......
...@@ -33,16 +33,15 @@ public final class ConfigurationKeys { ...@@ -33,16 +33,15 @@ public final class ConfigurationKeys {
// BEAM // BEAM
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config"; public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config"; public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval"; public static final String TRIGGER_INTERVAL = "trigger.interval";
private ConfigurationKeys() { private ConfigurationKeys() {}
}
} }
...@@ -14,6 +14,6 @@ num.threads=1 ...@@ -14,6 +14,6 @@ num.threads=1
commit.interval.ms=1000 commit.interval.ms=1000
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
specific.avro.reader=True specific.avro.reader=true
enable.auto.commit.config=True enable.auto.commit=true
auto.offset.reset.config=earliest auto.offset.reset=earliest
...@@ -12,6 +12,6 @@ num.threads=1 ...@@ -12,6 +12,6 @@ num.threads=1
commit.interval.ms=1000 commit.interval.ms=1000
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
specific.avro.reader=True specific.avro.reader=true
enable.auto.commit.config=True enable.auto.commit=true
auto.offset.reset.config=earliest auto.offset.reset=earliest
\ No newline at end of file \ No newline at end of file
...@@ -17,6 +17,6 @@ num.threads=1 ...@@ -17,6 +17,6 @@ num.threads=1
commit.interval.ms=1000 commit.interval.ms=1000
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
specific.avro.reader=True specific.avro.reader=true
enable.auto.commit.config=True enable.auto.commit=true
auto.offset.reset.config=earliest auto.offset.reset=earliest
\ No newline at end of file \ No newline at end of file
...@@ -251,10 +251,10 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -251,10 +251,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put( consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put( consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put( consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG, this.config ConsumerConfig.GROUP_ID_CONFIG, this.config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
...@@ -265,10 +265,10 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -265,10 +265,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put( consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put( consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put( consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
......
...@@ -20,6 +20,6 @@ num.threads=1 ...@@ -20,6 +20,6 @@ num.threads=1
commit.interval.ms=1000 commit.interval.ms=1000
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
specific.avro.reader=True specific.avro.reader=true
enable.auto.commit.config=True enable.auto.commit=true
auto.offset.reset.config=earliest auto.offset.reset=earliest
\ No newline at end of file \ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment