diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java index 140c592f4e33334fbced6a80b82173c00d19eb25..e3b31525094859f011097a0e8c1c28b5ed6e7330 100644 --- a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java @@ -1,5 +1,6 @@ package uc1.streamprocessing; +import com.google.gson.Gson; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; @@ -14,32 +15,30 @@ import titan.ccp.models.records.ActivePowerRecordFactory; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - private final String inputTopic; + private final String inputTopic; - private final StreamsBuilder builder = new StreamsBuilder(); + private final StreamsBuilder builder = new StreamsBuilder(); - /** - * Create a new {@link TopologyBuilder} using the given topics. - */ - public TopologyBuilder(final String inputTopic) { - this.inputTopic = inputTopic; - } + /** + * Create a new {@link TopologyBuilder} using the given topics. + */ + public TopologyBuilder(final String inputTopic) { + this.inputTopic = inputTopic; + } - /** - * Build the {@link Topology} for the History microservice. - */ - public Topology build() { + /** + * Build the {@link Topology} for the History microservice. + */ + public Topology build() { + final Gson gson = new Gson(); - this.builder - .stream(this.inputTopic, Consumed.with( - Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - .mapValues(value -> value.getValueInW()) - .foreach((key, measurement) -> LOGGER - .info("Key: " + key + " Value: " + measurement)); + this.builder + .stream(this.inputTopic, + Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + .mapValues(v -> gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); - return this.builder.build(); - } + return this.builder.build(); + } }