diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..831db7fe63be6529e6b7ba299dca92b138ff7d13 --- /dev/null +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java @@ -0,0 +1,22 @@ +package theodolite.uc1.application; + +import com.google.gson.Gson; +import org.apache.flink.api.common.functions.MapFunction; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON + * strings. + */ +public class GsonMapper implements MapFunction<ActivePowerRecord, String> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private static final Gson GSON = new Gson(); + + @Override + public String map(final ActivePowerRecord value) throws Exception { + return GSON.toJson(value); + } + +} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 5d506550f1d32fbb3458b7b87e630700a8171980..6655b52ec3020f46bb8a37c7124ee870fa663573 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -60,11 +60,8 @@ public final class HistoryServiceFlinkJob { stream .rebalance() - .map(v -> "ActivePowerRecord { " - + "identifier: " + v.getIdentifier() + ", " - + "timestamp: " + v.getTimestamp() + ", " - + "valueInW: " + v.getValueInW() + " }") - .print(); + .map(new GsonMapper()) + .flatMap((record, c) -> LOGGER.info("Record: {}", record)); } /** diff --git a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 75c833aa722654395b1adc6f739395eea5256820..427a838f45f6807ede00dcb68ebf8c5580f28ce6 100644 --- a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -17,11 +17,11 @@ import titan.ccp.model.records.ActivePowerRecord; public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private static final Gson GSON = new Gson(); private final String inputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); @@ -42,8 +42,8 @@ public class TopologyBuilder { .stream(this.inputTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .mapValues(v -> this.gson.toJson(v)) - .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); + .mapValues(v -> GSON.toJson(v)) + .foreach((k, record) -> LOGGER.info("Record: {}", record)); return this.builder.build(properties); }