Skip to content
Snippets Groups Projects
Commit 26c2c743 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

convert activePowerRecord to JSON

parent 59e5f1e6
Branches
Tags
1 merge request!1Add Implementations of Use Cases
package uc1.streamprocessing; package uc1.streamprocessing;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
...@@ -31,14 +32,12 @@ public class TopologyBuilder { ...@@ -31,14 +32,12 @@ public class TopologyBuilder {
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build() {
final Gson gson = new Gson();
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic,
Serdes.String(), Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) .mapValues(v -> gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
.mapValues(value -> value.getValueInW())
.foreach((key, measurement) -> LOGGER
.info("Key: " + key + " Value: " + measurement));
return this.builder.build(); return this.builder.build();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment