Skip to content
Snippets Groups Projects

Draft: Feature/avro without registry

3 files
+ 68
3
Compare changes
  • Side-by-side
  • Inline
Files
3
package theodolite.uc1.streamprocessing;
package theodolite.uc1.streamprocessing;
import com.google.gson.Gson;
import com.google.gson.Gson;
 
import java.io.IOException;
 
import java.util.function.Function;
 
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.LoggerFactory;
 
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
@@ -37,10 +41,30 @@ public class TopologyBuilder {
@@ -37,10 +41,30 @@ public class TopologyBuilder {
* Build the {@link Topology} for the History microservice.
* Build the {@link Topology} for the History microservice.
*/
*/
public Topology build() {
public Topology build() {
 
final Function<ActivePowerRecord, byte[]> serializer = (apr) -> {
 
try {
 
return apr.toByteBuffer().array();
 
} catch (final IOException e) {
 
e.printStackTrace();
 
}
 
return null;
 
};
 
 
final Function<byte[], ActivePowerRecord> deserializer = (bytes) -> {
 
try {
 
return ActivePowerRecord.getDecoder().decode(bytes);
 
} catch (final IOException e) {
 
e.printStackTrace();
 
}
 
return null;
 
};
 
 
final Serde<ActivePowerRecord> serde = GenericSerde.from(serializer, deserializer);
 
this.builder
this.builder
.stream(this.inputTopic, Consumed.with(
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
serde))
.mapValues(v -> this.gson.toJson(v))
.mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
Loading