diff --git a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index dd17234bf1adb1f0fcf3ff3ab134a0743b917369..6e4a43271fbf1e0193c2d39569a0814d1f7935cd 100644 --- a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -6,6 +6,7 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,13 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender this.keyAccessor.apply(monitoringRecord), monitoringRecord); LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } } public void terminate() {