Skip to content
Snippets Groups Projects
Commit f4c7e575 authored by Sören Henning's avatar Sören Henning
Browse files

Convert anonymous Classes to Lambda

parent ac1f7a39
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2250 failed
......@@ -5,18 +5,16 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -109,28 +107,22 @@ public class HistoryServiceFlinkJob {
}
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic);
stream
env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
.keyBy((KeySelector<ActivePowerRecord, String>) ActivePowerRecord::getIdentifier)
.keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(final Tuple2<String, Stats> t) {
final String key = t.f0;
final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value);
}
}).name("map")
.map(t -> {
final String key = t.f0;
final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
LOGGER.info("Execution plan: {}", env.getExecutionPlan());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment