diff --git a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..66b402b58f39b79066638ce679c27c0378d5be54 100644 --- a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs +++ b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true @@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= sp_cleanup.add_default_serial_version_id=true sp_cleanup.add_generated_serial_version_id=false sp_cleanup.add_missing_annotations=true diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java index a09cbd210f242ea63f6281172f4a21e2d22357fe..22f615a6af4caf575af57dbe9b7f989889c4095f 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java @@ -1,5 +1,6 @@ package theodolite.commons.flink.serialization; +import javax.annotation.Nullable; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; @@ -7,29 +8,35 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; +import theodolite.commons.flink.util.SerializableSupplier; -import javax.annotation.Nullable; - +/** + * A {@link KafkaSerializationSchema} and {@link KafkaDeserializationSchema} for an arbitrary + * key-value-pair in Kafka, mapped to/from a Flink {@link Tuple2}. + * + * @param <K> Type of the key. + * @param <V> Type of the value. + */ public class FlinkKafkaKeyValueSerde<K, V> - implements KafkaDeserializationSchema<Tuple2<K, V>>, - KafkaSerializationSchema<Tuple2<K, V>> { + implements KafkaDeserializationSchema<Tuple2<K, V>>, KafkaSerializationSchema<Tuple2<K, V>> { + + private static final long serialVersionUID = 2469569396501933443L; // NOPMD - private static final long serialVersionUID = 2469569396501933443L; + private final SerializableSupplier<Serde<K>> keySerdeSupplier; + private final SerializableSupplier<Serde<V>> valueSerdeSupplier; + private final String topic; + private final TypeInformation<Tuple2<K, V>> typeInfo; private transient Serde<K> keySerde; private transient Serde<V> valueSerde; - private SerializableSupplier<Serde<K>> keySerdeSupplier; - private SerializableSupplier<Serde<V>> valueSerdeSupplier; - - private String topic; - - private TypeInformation<Tuple2<K,V>> typeInfo; - + /** + * Create a new {@link FlinkKafkaKeyValueSerde}. + */ public FlinkKafkaKeyValueSerde(final String topic, - final SerializableSupplier<Serde<K>> keySerdeSupplier, - final SerializableSupplier<Serde<V>> valueSerdeSupplier, - final TypeInformation<Tuple2<K, V>> typeInfo) { + final SerializableSupplier<Serde<K>> keySerdeSupplier, + final SerializableSupplier<Serde<V>> valueSerdeSupplier, + final TypeInformation<Tuple2<K, V>> typeInfo) { this.topic = topic; this.typeInfo = typeInfo; this.keySerdeSupplier = keySerdeSupplier; @@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V> @Override public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) { - ensureInitialized(); + this.ensureInitialized(); final K key = this.keySerde.deserializer().deserialize(this.topic, record.key()); final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value()); return new Tuple2<>(key, value); @@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V> } @Override - public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) { - ensureInitialized(); + public ProducerRecord<byte[], byte[]> serialize(final Tuple2<K, V> element, + @Nullable final Long timestamp) { + this.ensureInitialized(); final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0); final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1); return new ProducerRecord<>(this.topic, key, value); @@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V> private void ensureInitialized() { if (this.keySerde == null || this.valueSerde == null) { this.keySerde = this.keySerdeSupplier.get(); - this.valueSerde = this.valueSerdeSupplier.get();; + this.valueSerde = this.valueSerdeSupplier.get(); } } + } diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java deleted file mode 100644 index 1f535c74697507f06c97d97b1b86c1086ec1491d..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java +++ /dev/null @@ -1,8 +0,0 @@ -package theodolite.commons.flink.serialization; - -import java.io.Serializable; -import java.util.function.Supplier; - -public interface SerializableSupplier<T> extends Supplier<T>, Serializable { - // here be dragons -} diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java index 53d25dced2aa3b1736ace1c38c6a75bf0f34e24a..f1f9870fda73ccec0fc25c5c70665759ab07d893 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java @@ -9,11 +9,11 @@ import com.google.common.math.Stats; import java.io.Serializable; /** - * Custom Kryo Serializer for efficient transmission between Flink instances. + * Custom Kryo {@link Serializer} for efficient transmission between Flink instances. */ public class StatsSerializer extends Serializer<Stats> implements Serializable { - private static final long serialVersionUID = -1276866176534267373L; + private static final long serialVersionUID = -1276866176534267373L; //NOPMD @Override public void write(final Kryo kryo, final Output output, final Stats object) { diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java new file mode 100644 index 0000000000000000000000000000000000000000..bcc51a9ef7b8bb0f36398ea401f1d2c898472081 --- /dev/null +++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java @@ -0,0 +1,13 @@ +package theodolite.commons.flink.util; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Interface for {@link Supplier}s which are serializable. + * + * @param <T> the type of results supplied by this supplier + */ +public interface SerializableSupplier<T> extends Supplier<T>, Serializable { // NOPMD + // Nothing to do here +}