Skip to content
Snippets Groups Projects
Commit df36a2f7 authored by Sören Henning's avatar Sören Henning
Browse files

Fix further code quality issues

parent 9802b6f4
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2254 failed
...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true ...@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true cleanup.remove_trailing_whitespaces_all=true
...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true ...@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true sp_cleanup.add_missing_annotations=true
......
package theodolite.commons.flink.serialization; package theodolite.commons.flink.serialization;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
...@@ -7,25 +8,31 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; ...@@ -7,25 +8,31 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import theodolite.commons.flink.util.SerializableSupplier;
import javax.annotation.Nullable; /**
* A {@link KafkaSerializationSchema} and {@link KafkaDeserializationSchema} for an arbitrary
* key-value-pair in Kafka, mapped to/from a Flink {@link Tuple2}.
*
* @param <K> Type of the key.
* @param <V> Type of the value.
*/
public class FlinkKafkaKeyValueSerde<K, V> public class FlinkKafkaKeyValueSerde<K, V>
implements KafkaDeserializationSchema<Tuple2<K, V>>, implements KafkaDeserializationSchema<Tuple2<K, V>>, KafkaSerializationSchema<Tuple2<K, V>> {
KafkaSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = 2469569396501933443L; // NOPMD
private static final long serialVersionUID = 2469569396501933443L; private final SerializableSupplier<Serde<K>> keySerdeSupplier;
private final SerializableSupplier<Serde<V>> valueSerdeSupplier;
private final String topic;
private final TypeInformation<Tuple2<K, V>> typeInfo;
private transient Serde<K> keySerde; private transient Serde<K> keySerde;
private transient Serde<V> valueSerde; private transient Serde<V> valueSerde;
private SerializableSupplier<Serde<K>> keySerdeSupplier; /**
private SerializableSupplier<Serde<V>> valueSerdeSupplier; * Create a new {@link FlinkKafkaKeyValueSerde}.
*/
private String topic;
private TypeInformation<Tuple2<K,V>> typeInfo;
public FlinkKafkaKeyValueSerde(final String topic, public FlinkKafkaKeyValueSerde(final String topic,
final SerializableSupplier<Serde<K>> keySerdeSupplier, final SerializableSupplier<Serde<K>> keySerdeSupplier,
final SerializableSupplier<Serde<V>> valueSerdeSupplier, final SerializableSupplier<Serde<V>> valueSerdeSupplier,
...@@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V>
@Override @Override
public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) { public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
ensureInitialized(); this.ensureInitialized();
final K key = this.keySerde.deserializer().deserialize(this.topic, record.key()); final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value()); final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
return new Tuple2<>(key, value); return new Tuple2<>(key, value);
...@@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V>
} }
@Override @Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) { public ProducerRecord<byte[], byte[]> serialize(final Tuple2<K, V> element,
ensureInitialized(); @Nullable final Long timestamp) {
this.ensureInitialized();
final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0); final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1); final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
return new ProducerRecord<>(this.topic, key, value); return new ProducerRecord<>(this.topic, key, value);
...@@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V> ...@@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V>
private void ensureInitialized() { private void ensureInitialized() {
if (this.keySerde == null || this.valueSerde == null) { if (this.keySerde == null || this.valueSerde == null) {
this.keySerde = this.keySerdeSupplier.get(); this.keySerde = this.keySerdeSupplier.get();
this.valueSerde = this.valueSerdeSupplier.get();; this.valueSerde = this.valueSerdeSupplier.get();
} }
} }
} }
...@@ -9,11 +9,11 @@ import com.google.common.math.Stats; ...@@ -9,11 +9,11 @@ import com.google.common.math.Stats;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Custom Kryo Serializer for efficient transmission between Flink instances. * Custom Kryo {@link Serializer} for efficient transmission between Flink instances.
*/ */
public class StatsSerializer extends Serializer<Stats> implements Serializable { public class StatsSerializer extends Serializer<Stats> implements Serializable {
private static final long serialVersionUID = -1276866176534267373L; private static final long serialVersionUID = -1276866176534267373L; //NOPMD
@Override @Override
public void write(final Kryo kryo, final Output output, final Stats object) { public void write(final Kryo kryo, final Output output, final Stats object) {
......
package theodolite.commons.flink.serialization; package theodolite.commons.flink.util;
import java.io.Serializable; import java.io.Serializable;
import java.util.function.Supplier; import java.util.function.Supplier;
public interface SerializableSupplier<T> extends Supplier<T>, Serializable { /**
// here be dragons * Interface for {@link Supplier}s which are serializable.
*
* @param <T> the type of results supplied by this supplier
*/
public interface SerializableSupplier<T> extends Supplier<T>, Serializable { // NOPMD
// Nothing to do here
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment