From 26c2c7431661571b39076c6201712ee5c6fb6d8e Mon Sep 17 00:00:00 2001 From: ben <stu126940@mail.uni-kiel.de> Date: Mon, 16 Mar 2020 12:58:06 +0100 Subject: [PATCH] convert activePowerRecord to JSON --- .../uc1/streamprocessing/TopologyBuilder.java | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java index 140c592f4..e3b315250 100644 --- a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java @@ -1,5 +1,6 @@ package uc1.streamprocessing; +import com.google.gson.Gson; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; @@ -14,32 +15,30 @@ import titan.ccp.models.records.ActivePowerRecordFactory; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - private final String inputTopic; + private final String inputTopic; - private final StreamsBuilder builder = new StreamsBuilder(); + private final StreamsBuilder builder = new StreamsBuilder(); - /** - * Create a new {@link TopologyBuilder} using the given topics. - */ - public TopologyBuilder(final String inputTopic) { - this.inputTopic = inputTopic; - } + /** + * Create a new {@link TopologyBuilder} using the given topics. + */ + public TopologyBuilder(final String inputTopic) { + this.inputTopic = inputTopic; + } - /** - * Build the {@link Topology} for the History microservice. - */ - public Topology build() { + /** + * Build the {@link Topology} for the History microservice. + */ + public Topology build() { + final Gson gson = new Gson(); - this.builder - .stream(this.inputTopic, Consumed.with( - Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - .mapValues(value -> value.getValueInW()) - .foreach((key, measurement) -> LOGGER - .info("Key: " + key + " Value: " + measurement)); + this.builder + .stream(this.inputTopic, + Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + .mapValues(v -> gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); - return this.builder.build(); - } + return this.builder.build(); + } } -- GitLab