Skip to content
Snippets Groups Projects
Commit 3ce158ec authored by Sören Henning's avatar Sören Henning
Browse files

Align Flink Kafka config with others

parent 95c5a28d
No related branches found
No related tags found
No related merge requests found
Pipeline #10219 failed
......@@ -23,6 +23,8 @@ import rocks.theodolite.benchmarks.commons.flink.util.SerializableSupplier;
*/
public class KafkaConnectorFactory {
private static final String AUTO_OFFSET_RESET_EARLIEST = "earliest";
private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
private final Properties kafkaProps = new Properties();
......@@ -50,7 +52,7 @@ public class KafkaConnectorFactory {
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final DeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.buildConsumerProperties()));
}
/**
......@@ -60,7 +62,7 @@ public class KafkaConnectorFactory {
public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
final KafkaDeserializationSchema<T> deserializationSchema) {
return this.createBaseConsumer(
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
new FlinkKafkaConsumer<>(topic, deserializationSchema, this.buildConsumerProperties()));
}
/**
......@@ -145,6 +147,14 @@ public class KafkaConnectorFactory {
return producerProps;
}
private Properties buildConsumerProperties() {
final Properties consumerProps = this.cloneProperties();
consumerProps.setProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
AUTO_OFFSET_RESET_EARLIEST);
return consumerProps;
}
private Properties cloneProperties() {
final Properties props = new Properties();
props.putAll(this.kafkaProps);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment