From 48e1879cf27c721eb2b3c1554fc962c3532a2080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Sat, 30 May 2020 11:07:15 +0200 Subject: [PATCH] Fix use of key serializer instead value serializer of avro --- .../java/theodolite/uc1/streamprocessing/TopologyBuilder.java | 2 +- .../main/java/theodolite/kafkasender/KafkaRecordSender.java | 4 ++-- .../java/theodolite/uc3/streamprocessing/TopologyBuilder.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 6d2e4fc05..1c30e0c2c 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -40,7 +40,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); diff --git a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java index f9c50f963..ad373e349 100644 --- a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java @@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * - * @param <T> {@link IMonitoringRecord} to send + * @param <T> {@link SpecificRecord} to send */ public class KafkaRecordSender<T extends SpecificRecord> { @@ -72,7 +72,7 @@ public class KafkaRecordSender<T extends SpecificRecord> { this.producer = new KafkaProducer<>(properties, new StringSerializer(), - srAvroSerdeFactory.<T>forKeys().serializer()); + srAvroSerdeFactory.<T>forValues().serializer()); } /** diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index eec3793d8..74eed74c5 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -50,7 +50,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.of(this.duration)) // .aggregate( -- GitLab