Skip to content
Snippets Groups Projects
Commit 909909d3 authored by Sören Henning's avatar Sören Henning
Browse files

Simplify type hinting

parent 6e61db7a
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2260 failed
......@@ -5,8 +5,6 @@ import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
......@@ -71,7 +69,7 @@ public class HistoryServiceFlinkJob {
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String,
Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
Types.TUPLE(Types.STRING, Types.STRING));
kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000); // TODO necessary?
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
......
......@@ -8,8 +8,6 @@ import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -87,7 +85,7 @@ public class HistoryServiceFlinkJob {
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String,
Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
Types.TUPLE(Types.STRING, Types.STRING));
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
......
......@@ -123,7 +123,7 @@ public class AggregationServiceFlinkJob {
outputTopic,
Serdes::String,
() -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {}));
Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
new FlinkKafkaProducer<>(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment