diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index 714f567c763dc8a5348aae258652371ce3da475f..fa82973ec2a361cc743c18124e4fa3b6bdbae560 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -5,18 +5,16 @@ import java.io.IOException; import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -109,28 +107,22 @@ public class HistoryServiceFlinkJob { } env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); - env.getConfig().getRegisteredTypesWithKryoSerializers() .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " + s.getSerializer().getClass().getName())); - final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource) - .name("[Kafka Consumer] Topic: " + inputTopic); - - stream + env + .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) .rebalance() - .keyBy((KeySelector<ActivePowerRecord, String>) ActivePowerRecord::getIdentifier) + .keyBy(ActivePowerRecord::getIdentifier) .window(TumblingEventTimeWindows.of(Time.minutes(windowDuration))) .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) - .map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() { - @Override - public Tuple2<String, String> map(final Tuple2<String, Stats> t) { - final String key = t.f0; - final String value = t.f1.toString(); - LOGGER.info("{}: {}", key, value); - return new Tuple2<>(key, value); - } - }).name("map") + .map(t -> { + final String key = t.f0; + final String value = t.f1.toString(); + LOGGER.info("{}: {}", key, value); + return new Tuple2<>(key, value); + }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); LOGGER.info("Execution plan: {}", env.getExecutionPlan());