diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index b438d651389a09d96cc991ca10a04fc4673228e9..4e75c3a8de1f3b25b00822bdec106c8b76b43af7 100644 --- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -5,8 +5,6 @@ import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; @@ -71,7 +69,7 @@ public class HistoryServiceFlinkJob { new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); + Types.TUPLE(Types.STRING, Types.STRING)); kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary? final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index 81d7110ac031b4794487fb14cc3a30e7ac87dc0a..c788d691f3a9819fe6ded847cb2ab4ae4b718b61 100644 --- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -8,8 +8,6 @@ import java.util.Properties; import org.apache.commons.configuration2.Configuration; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -87,7 +85,7 @@ public class HistoryServiceFlinkJob { new FlinkKafkaKeyValueSerde<>(outputTopic, Serdes::String, Serdes::String, - TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); + Types.TUPLE(Types.STRING, Types.STRING)); final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>( outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index e2c5bbed4a4ccb72ab040a44844dd6694324668d..9ac16549b0d69942c962f080c4a9ff6d601b42e9 100644 --- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -123,7 +123,7 @@ public class AggregationServiceFlinkJob { outputTopic, Serdes::String, () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), - TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {})); + Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = new FlinkKafkaProducer<>(