diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java index 4af9053be989d34775e78d43a88e3d0d24cdf411..f102bee41d66c251ecb66418dd3b90dced32cffb 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java @@ -28,11 +28,20 @@ public class KafkaActivePowerRecordReader extends final Map<String, Object> consumerConfig) { super(); + if (bootstrapServer == null) { + throw new IllegalArgumentException("bootstrapServer is null"); + } + + if (inputTopic == null) { + throw new IllegalArgumentException("inputTopic is null"); + } + // Check if boostrap server and inputTopic are defined if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } + reader = KafkaIO.<String, ActivePowerRecord>read() .withBootstrapServers(bootstrapServer)