diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 6d2e4fc054c2ab1c38a1377c5170925cd2e8cd98..1c30e0c2c83b3d8a2f3dca4df0c7aec99cc4f450 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -40,7 +40,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); diff --git a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java index f9c50f96339c27d70783213c4d3f8c218ad939c8..ad373e3498dc1e0326978e432df5568073093a2e 100644 --- a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java @@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * - * @param <T> {@link IMonitoringRecord} to send + * @param <T> {@link SpecificRecord} to send */ public class KafkaRecordSender<T extends SpecificRecord> { @@ -72,7 +72,7 @@ public class KafkaRecordSender<T extends SpecificRecord> { this.producer = new KafkaProducer<>(properties, new StringSerializer(), - srAvroSerdeFactory.<T>forKeys().serializer()); + srAvroSerdeFactory.<T>forValues().serializer()); } /** diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index eec3793d8658eccff38e2b064fe314967b9eac85..74eed74c52a78df229c02542bc6e66d7f796c2c7 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -50,7 +50,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.of(this.duration)) // .aggregate(