Skip to content
Snippets Groups Projects
Commit aa07fc73 authored by Sören Henning's avatar Sören Henning
Browse files

Fix compilation issue

parent 25e76502
No related branches found
No related tags found
1 merge request!272Introduce Abstract Flink Service Class
Pipeline #8633 passed
package rocks.theodolite.benchmarks.uc2.flink; package rocks.theodolite.benchmarks.uc2.flink;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
...@@ -16,7 +13,7 @@ import org.slf4j.LoggerFactory; ...@@ -16,7 +13,7 @@ import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import titan.ccp.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
/** /**
...@@ -64,9 +61,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -64,9 +61,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
// .rebalance() // .rebalance()
.keyBy(ActivePowerRecord::getIdentifier) .keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(windowDuration)) .window(TumblingEventTimeWindows.of(windowDuration))
.aggregate( .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
(AggregateFunction<ActivePowerRecord, Stats, Stats>) new StatsAggregateFunction(),
(ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow>) new StatsProcessWindowFunction())
.map(t -> { .map(t -> {
final String key = t.f0; final String key = t.f0;
final String value = t.f1.toString(); final String value = t.f1.toString();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment