diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index ea7716dbc10f61f03a6c5705c9d17e2d3a12745d..5a34d17a89186630afb0917e16940210b84fd5e8 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -1,13 +1,10 @@ package rocks.theodolite.benchmarks.uc2.flink; import com.google.common.math.Stats; -import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; @@ -16,7 +13,7 @@ import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; -import titan.ccp.model.records.ActivePowerRecord; +import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; /** @@ -64,9 +61,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { // .rebalance() .keyBy(ActivePowerRecord::getIdentifier) .window(TumblingEventTimeWindows.of(windowDuration)) - .aggregate( - (AggregateFunction<ActivePowerRecord, Stats, Stats>) new StatsAggregateFunction(), - (ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow>) new StatsProcessWindowFunction()) + .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) .map(t -> { final String key = t.f0; final String value = t.f1.toString();