From f49a8895be7a57c9938c30a1550749245494e823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Tue, 16 Mar 2021 11:12:04 +0100 Subject: [PATCH] Unify UC1 for Flink and Kafka Streams --- .../uc1/application/GsonMapper.java | 22 +++++++++++++++++++ .../application/HistoryServiceFlinkJob.java | 7 ++---- .../uc1/streamprocessing/TopologyBuilder.java | 6 ++--- 3 files changed, 27 insertions(+), 8 deletions(-) create mode 100644 theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java new file mode 100644 index 000000000..831db7fe6 --- /dev/null +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java @@ -0,0 +1,22 @@ +package theodolite.uc1.application; + +import com.google.gson.Gson; +import org.apache.flink.api.common.functions.MapFunction; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON + * strings. + */ +public class GsonMapper implements MapFunction<ActivePowerRecord, String> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private static final Gson GSON = new Gson(); + + @Override + public String map(final ActivePowerRecord value) throws Exception { + return GSON.toJson(value); + } + +} diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 5d506550f..6655b52ec 100644 --- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -60,11 +60,8 @@ public final class HistoryServiceFlinkJob { stream .rebalance() - .map(v -> "ActivePowerRecord { " - + "identifier: " + v.getIdentifier() + ", " - + "timestamp: " + v.getTimestamp() + ", " - + "valueInW: " + v.getValueInW() + " }") - .print(); + .map(new GsonMapper()) + .flatMap((record, c) -> LOGGER.info("Record: {}", record)); } /** diff --git a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 75c833aa7..427a838f4 100644 --- a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -17,11 +17,11 @@ import titan.ccp.model.records.ActivePowerRecord; public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private static final Gson GSON = new Gson(); private final String inputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; - private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); @@ -42,8 +42,8 @@ public class TopologyBuilder { .stream(this.inputTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .mapValues(v -> this.gson.toJson(v)) - .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); + .mapValues(v -> GSON.toJson(v)) + .foreach((k, record) -> LOGGER.info("Record: {}", record)); return this.builder.build(properties); } -- GitLab