Skip to content
Snippets Groups Projects
Commit 48e1879c authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Fix use of key serializer instead value serializer of avro

parent b78c1326
Branches
Tags
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
This commit is part of merge request !28. Comments created here will be created in the context of that merge request.
...@@ -40,7 +40,7 @@ public class TopologyBuilder { ...@@ -40,7 +40,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
......
...@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; ...@@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Sends monitoring records to Kafka. * Sends monitoring records to Kafka.
* *
* @param <T> {@link IMonitoringRecord} to send * @param <T> {@link SpecificRecord} to send
*/ */
public class KafkaRecordSender<T extends SpecificRecord> { public class KafkaRecordSender<T extends SpecificRecord> {
...@@ -72,7 +72,7 @@ public class KafkaRecordSender<T extends SpecificRecord> { ...@@ -72,7 +72,7 @@ public class KafkaRecordSender<T extends SpecificRecord> {
this.producer = new KafkaProducer<>(properties, this.producer = new KafkaProducer<>(properties,
new StringSerializer(), new StringSerializer(),
srAvroSerdeFactory.<T>forKeys().serializer()); srAvroSerdeFactory.<T>forValues().serializer());
} }
/** /**
......
...@@ -50,7 +50,7 @@ public class TopologyBuilder { ...@@ -50,7 +50,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey() .groupByKey()
.windowedBy(TimeWindows.of(this.duration)) .windowedBy(TimeWindows.of(this.duration))
// .aggregate( // .aggregate(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment