From 909909d3d1d892ea963cc78640053382790c0a86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 11 Mar 2021 19:57:40 +0100 Subject: [PATCH] Simplify type hinting --- .../theodolite/uc2/application/HistoryServiceFlinkJob.java | 4 +--- .../theodolite/uc3/application/HistoryServiceFlinkJob.java | 4 +--- .../uc4/application/AggregationServiceFlinkJob.java | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index b438d6513..4e75c3a8d 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -5,8 +5,6 @@ import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; @@ -71,7 +69,7 @@ public class HistoryServiceFlinkJob { new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); + Types.TUPLE(Types.STRING, Types.STRING)); kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary? final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index 81d7110ac..c788d691f 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -8,8 +8,6 @@ import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -87,7 +85,7 @@ public class HistoryServiceFlinkJob { new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); + Types.TUPLE(Types.STRING, Types.STRING)); final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index e2c5bbed4..9ac16549b 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -123,7 +123,7 @@ public class AggregationServiceFlinkJob { outputTopic, Serdes::String, () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), - TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {})); + Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = new FlinkKafkaProducer<>( -- GitLab