Skip to content
Snippets Groups Projects
Commit 70a5b2ae authored by Sören Henning's avatar Sören Henning
Browse files

Handle unavailability of Schema Registry

parent aa13021f
No related branches found
No related tags found
No related merge requests found
Pipeline #2267 canceled
...@@ -6,6 +6,7 @@ import org.apache.avro.specific.SpecificRecord; ...@@ -6,6 +6,7 @@ import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -116,7 +117,13 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -116,7 +117,13 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
this.keyAccessor.apply(monitoringRecord), monitoringRecord); this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record); try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
} }
public void terminate() { public void terminate() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment