Skip to content
Snippets Groups Projects
Commit 4bcca9fa authored by Sören Henning's avatar Sören Henning
Browse files

Convert anonymous class to lambda

parent ca558640
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2247 failed
...@@ -8,10 +8,10 @@ import java.time.ZoneId; ...@@ -8,10 +8,10 @@ import java.time.ZoneId;
import java.util.Properties; import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
...@@ -19,7 +19,6 @@ import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDes ...@@ -19,7 +19,6 @@ import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDes
import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -103,7 +102,6 @@ public class HistoryServiceFlinkJob { ...@@ -103,7 +102,6 @@ public class HistoryServiceFlinkJob {
kafkaSink.setWriteTimestampToKafka(true); kafkaSink.setWriteTimestampToKafka(true);
// Execution environment configuration // Execution environment configuration
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
...@@ -134,13 +132,10 @@ public class HistoryServiceFlinkJob { ...@@ -134,13 +132,10 @@ public class HistoryServiceFlinkJob {
} }
// Streaming topology // Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
env
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource) .addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic); .name("[Kafka Consumer] Topic: " + inputTopic)
stream
.rebalance() .rebalance()
.keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> { .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
...@@ -149,23 +144,21 @@ public class HistoryServiceFlinkJob { ...@@ -149,23 +144,21 @@ public class HistoryServiceFlinkJob {
}) })
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() { .map(tuple -> {
@Override final String newKey = keyFactory.getSensorId(tuple.f0);
public Tuple2<String, String> map(final Tuple2<HourOfDayKey, Stats> tuple) { final String newValue = tuple.f1.toString();
final String newKey = keyFactory.getSensorId(tuple.f0); final int hourOfDay = tuple.f0.getHourOfDay();
final String newValue = tuple.f1.toString(); LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
final int hourOfDay = tuple.f0.getHourOfDay(); return new Tuple2<>(newKey, newValue);
LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue); })
return new Tuple2<>(newKey, newValue); .name("map")
} .returns(Types.TUPLE(Types.STRING, Types.STRING))
}).name("map")
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
// Execution plan // Execution plan
LOGGER.info("Execution Plan: {}", env.getExecutionPlan()); LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
// Execute Job // Execute Job
try { try {
env.execute(applicationId); env.execute(applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink } catch (final Exception e) { // NOPMD Execution thrown by Flink
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment