From 70a5b2ae09a3ef7c98a4624c2754890b333524f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 11 Mar 2021 16:28:40 +0100 Subject: [PATCH] Handle unavailability of Schema Registry --- .../commons/workloadgeneration/KafkaRecordSender.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index dd17234bf..6e4a43271 100644 --- a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -6,6 +6,7 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,13 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender this.keyAccessor.apply(monitoringRecord), monitoringRecord); LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } } public void terminate() { -- GitLab