Skip to content
Snippets Groups Projects
Commit f49a8895 authored by Sören Henning's avatar Sören Henning
Browse files

Unify UC1 for Flink and Kafka Streams

parent 2604b654
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2359 failed
package theodolite.uc1.application;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
* strings.
*/
public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private static final Gson GSON = new Gson();
@Override
public String map(final ActivePowerRecord value) throws Exception {
return GSON.toJson(value);
}
}
......@@ -60,11 +60,8 @@ public final class HistoryServiceFlinkJob {
stream
.rebalance()
.map(v -> "ActivePowerRecord { "
+ "identifier: " + v.getIdentifier() + ", "
+ "timestamp: " + v.getTimestamp() + ", "
+ "valueInW: " + v.getValueInW() + " }")
.print();
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record));
}
/**
......
......@@ -17,11 +17,11 @@ import titan.ccp.model.records.ActivePowerRecord;
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private static final Gson GSON = new Gson();
private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder();
......@@ -42,8 +42,8 @@ public class TopologyBuilder {
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
.mapValues(v -> GSON.toJson(v))
.foreach((k, record) -> LOGGER.info("Record: {}", record));
return this.builder.build(properties);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment