From 4bcca9fa160a89f1d04f8409a22f7098311c3bd4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 11 Mar 2021 16:18:50 +0100
Subject: [PATCH] Convert anonymous class to lambda

---
 .../application/HistoryServiceFlinkJob.java   | 33 ++++++++-----------
 1 file changed, 13 insertions(+), 20 deletions(-)

diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index 18e54ab6e..64819070b 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -8,10 +8,10 @@ import java.time.ZoneId;
 import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -19,7 +19,6 @@ import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDes
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -103,7 +102,6 @@ public class HistoryServiceFlinkJob {
     kafkaSink.setWriteTimestampToKafka(true);
 
     // Execution environment configuration
-
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -134,13 +132,10 @@ public class HistoryServiceFlinkJob {
     }
 
     // Streaming topology
-
     final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
-
-    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
-        .name("[Kafka Consumer] Topic: " + inputTopic);
-
-    stream
+    env
+        .addSource(kafkaSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic)
         .rebalance()
         .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
           final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
@@ -149,23 +144,21 @@ public class HistoryServiceFlinkJob {
         })
         .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
         .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
-        .map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() {
-          @Override
-          public Tuple2<String, String> map(final Tuple2<HourOfDayKey, Stats> tuple) {
-            final String newKey = keyFactory.getSensorId(tuple.f0);
-            final String newValue = tuple.f1.toString();
-            final int hourOfDay = tuple.f0.getHourOfDay();
-            LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
-            return new Tuple2<>(newKey, newValue);
-          }
-        }).name("map")
+        .map(tuple -> {
+          final String newKey = keyFactory.getSensorId(tuple.f0);
+          final String newValue = tuple.f1.toString();
+          final int hourOfDay = tuple.f0.getHourOfDay();
+          LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
+          return new Tuple2<>(newKey, newValue);
+        })
+        .name("map")
+        .returns(Types.TUPLE(Types.STRING, Types.STRING))
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
 
     // Execution plan
     LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
 
     // Execute Job
-
     try {
       env.execute(applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
-- 
GitLab