diff --git a/theodolite-benchmarks/commons/build.gradle b/theodolite-benchmarks/commons/build.gradle index 5c663fb6820b5643c41a5b85c39234fa17a785c4..2d75e824bb97c656de6123cebf84befc90b5ae54 100644 --- a/theodolite-benchmarks/commons/build.gradle +++ b/theodolite-benchmarks/commons/build.gradle @@ -30,6 +30,10 @@ dependencies { implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-api:1.7.25' + implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') { + // Kafka client is already included from Kafka Streams in version 2.4.0 + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } // Use JUnit test framework testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.1' diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..1ba798c3a717c13bf12341080c05fe722c0d18dc --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java @@ -0,0 +1,39 @@ +package rocks.theodolite.commons.kafka.avro; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.Collections; +import java.util.Map; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Serde; + +/** + * Factory methods to create {@link Serde}s for Avro records using the Confluent Schema Registry. + */ +public final class SchemaRegistryAvroSerdeFactory { + + private static final String SCHEMA_REGISTRY_URL_KEY = + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + + private final Map<String, String> serdeConfig; + + public SchemaRegistryAvroSerdeFactory(final String schemaRegistryUrl) { + this.serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_KEY, schemaRegistryUrl); + + } + + public <T extends SpecificRecord> Serde<T> forKeys() { + return this.build(true); + } + + public <T extends SpecificRecord> Serde<T> forValues() { + return this.build(false); + } + + private <T extends SpecificRecord> Serde<T> build(final boolean isKey) { + final Serde<T> avroSerde = new SpecificAvroSerde<>(); + avroSerde.configure(this.serdeConfig, isKey); + return avroSerde; + } + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferConstants.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..7c1a4321288dd06b63b69a251d0b5dea0ef6a506 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferConstants.java @@ -0,0 +1,18 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import java.nio.charset.Charset; + +/** + * Shared constants between {@link WriteBuffer} and {@link ReadBuffer}. + */ +public final class BufferConstants { + + public static final Charset CHARSET = Charset.forName("UTF-8"); + + public static final byte BOOLEAN_FALSE = 0; + + public static final byte BOOLEAN_TRUE = 1; + + private BufferConstants() {} + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferDeserializer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..03055c4fa78f6c6e5e46f4db4144ac76032e9679 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferDeserializer.java @@ -0,0 +1,13 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +/** + * Deserializer to deserialize a {@link ReadBuffer} to an object. + * + * @param <T> the type of the deserialized object + */ +@FunctionalInterface +public interface BufferDeserializer<T> { + + T deserialize(ReadBuffer buffer); + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerde.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerde.java new file mode 100644 index 0000000000000000000000000000000000000000..77c517bdfb4805fa5a01a96591345bc973a2c7ad --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerde.java @@ -0,0 +1,11 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +/** + * Combine {@link BufferSerializer} and {@link BufferDeserializer} into one type. This allows + * implementing a serde in one class. + * + * @param <T> Type of the serializer and deserializer. + */ +public interface BufferSerde<T> extends BufferSerializer<T>, BufferDeserializer<T> { + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerializer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..9ea1cb107ca1b9fb7e1fa69f42dc4d4ee0e83fc5 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/BufferSerializer.java @@ -0,0 +1,13 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +/** + * Serializer to serialize an object to a {@link WriteBuffer}. + * + * @param <T> the type of the serialized object + */ +@FunctionalInterface +public interface BufferSerializer<T> { + + void serialize(WriteBuffer buffer, T data); + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/ReadBuffer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/ReadBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..ff96d69d7005f7793fb246cf8a822b3e73f65f3c --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/ReadBuffer.java @@ -0,0 +1,67 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import java.nio.ByteBuffer; + +/** + * A buffer that is constructed from a byte array and data can be sequentially read to different + * kinds of data types. It is the counterpart to {@link WriteBuffer}. + */ +public class ReadBuffer { + + private final ByteBuffer buffer; + + /** + * Create this buffer by a byte array. + */ + public ReadBuffer(final byte[] bytes) { + this.buffer = ByteBuffer.wrap(bytes); + } + + public byte getByte() { + return this.buffer.get(); + } + + /** + * Get a byte array. + */ + public byte[] getBytes() { + final int bytesLength = this.buffer.getInt(); + final byte[] bytes = new byte[bytesLength]; + this.buffer.get(bytes); + return bytes; + } + + public boolean getBoolean() { // NOPMD + return this.getByte() == BufferConstants.BOOLEAN_TRUE; + } + + public short getShort() { // NOPMD + return this.buffer.getShort(); + } + + public int getInt() { + return this.buffer.getInt(); + } + + public long getLong() { + return this.buffer.getLong(); + } + + public float getFloat() { + return this.buffer.getFloat(); + } + + public double getDouble() { + return this.buffer.getDouble(); + } + + public int getChar() { + return this.buffer.getChar(); + } + + public String getString() { + final byte[] bytes = this.getBytes(); + return new String(bytes, BufferConstants.CHARSET); + } + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleDeserializer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..804c806c92d591002ca71e7a4726b21081452c67 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleDeserializer.java @@ -0,0 +1,38 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * Kafka {@link Deserializer} to be configured with a {@link BufferDeserializer} for simpler usage. + * + * @param <T> the type of the deserialized object + */ +public class SimpleDeserializer<T> implements Deserializer<T> { + + private final BufferDeserializer<T> deserializer; + + public SimpleDeserializer(final BufferDeserializer<T> deserializer) { + this.deserializer = deserializer; + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Do nothing + } + + @Override + public T deserialize(final String topic, final byte[] bytes) { + if (bytes == null) { + return null; + } + final ReadBuffer buffer = new ReadBuffer(bytes); + return this.deserializer.deserialize(buffer); + } + + @Override + public void close() { + // Do nothing + } + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerdes.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerdes.java new file mode 100644 index 0000000000000000000000000000000000000000..19346e5e7c9fbfe165bb2c596e9b6609cb295b4b --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerdes.java @@ -0,0 +1,24 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; + +/** + * Factory class to create <i>Simple Serdes</i>. These are serdes created from a + * {@link BufferSerializer} and a {@link BufferDeserializer}. + */ +public final class SimpleSerdes { + + private SimpleSerdes() {} + + public static <T> Serde<T> create(final BufferSerde<T> serde) { + return Serdes.serdeFrom(new SimpleSerializer<>(serde), new SimpleDeserializer<>(serde)); + } + + public static <T> Serde<T> create(final BufferSerializer<T> serializer, + final BufferDeserializer<T> deserializer) { + return Serdes.serdeFrom(new SimpleSerializer<>(serializer), + new SimpleDeserializer<>(deserializer)); + } + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerializer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..a6b8f5ce4daa97bdaa1e20c39591eb15f58b3326 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/SimpleSerializer.java @@ -0,0 +1,39 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Kafka {@link Serializer} to be configured with a {@link BufferSerializer} for simpler usage. + * + * @param <T> the type of the serialized objects + */ +public class SimpleSerializer<T> implements Serializer<T> { + + private final BufferSerializer<T> serializer; + + public SimpleSerializer(final BufferSerializer<T> serializer) { + this.serializer = serializer; + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Do nothing + } + + @Override + public byte[] serialize(final String topic, final T data) { + if (data == null) { + return null; + } + final WriteBuffer buffer = new WriteBuffer(); + this.serializer.serialize(buffer, data); + return buffer.getBytes(); + } + + @Override + public void close() { + // Do nothing + } + +} diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/WriteBuffer.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/WriteBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..b10a1d610f40505860be223346b4d1b2b6731830 --- /dev/null +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/commons/kafka/simpleserdes/WriteBuffer.java @@ -0,0 +1,69 @@ +package rocks.theodolite.commons.kafka.simpleserdes; + +import java.nio.ByteBuffer; + +/** + * A buffer to which different kinds of data types can be written and its content can be returned as + * bytes. Its counterpart is a {@link ReadBuffer} which allows to read the data in the order it is + * written. + */ +public class WriteBuffer { + + private static final int BYTE_BUFFER_CAPACITY = 65536; // Is only virtual memory + + private final ByteBuffer buffer = ByteBuffer.allocateDirect(BYTE_BUFFER_CAPACITY); + + public void putByte(final byte value) { + this.buffer.put(value); + } + + public void putBytes(final byte[] value) { + this.buffer.putInt(value.length); + this.buffer.put(value); + } + + public void putBoolean(final boolean value) { + this.putByte( + value ? BufferConstants.BOOLEAN_TRUE : BufferConstants.BOOLEAN_FALSE); + } + + public void putShort(final short value) { // NOPMD + this.buffer.putShort(value); + } + + public void putInt(final int value) { + this.buffer.putInt(value); + } + + public void putLong(final long value) { + this.buffer.putLong(value); + } + + public void putFloat(final float value) { + this.buffer.putFloat(value); + } + + public void putDouble(final double value) { + this.buffer.putDouble(value); + } + + public void putChar(final char value) { + this.buffer.putChar(value); + } + + public void putString(final String value) { + final byte[] bytes = value.getBytes(BufferConstants.CHARSET); + this.putBytes(bytes); + } + + /** + * Get the content of this buffer as bytes. + */ + public byte[] getBytes() { + this.buffer.flip(); + final byte[] bytes = new byte[this.buffer.remaining()]; + this.buffer.get(bytes, 0, bytes.length); + return bytes; + } + +} diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/GenericSerde.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/GenericSerde.java new file mode 100644 index 0000000000000000000000000000000000000000..4fa269385e522d42f9c3af3a6f4102468380bc35 --- /dev/null +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/GenericSerde.java @@ -0,0 +1,64 @@ +package rocks.theodolite.benchmarks.commons.kstreams; + +import java.util.Map; +import java.util.function.Function; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Factory methods to create generic {@link Serde}s. + */ +public final class GenericSerde { + + private GenericSerde() {} + + /** + * Create a {@link Serde} using a serialize and a deserialize function. + * + * @param serializer function to convert a record into a byte array + * @param deserializer function to create a record from a byte array + */ + public static <T> Serde<T> from(final Function<T, byte[]> serializer, + final Function<byte[], T> deserializer) { + return org.apache.kafka.common.serialization.Serdes.serdeFrom(new Serializer<T>() { + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Do nothing + } + + @Override + public byte[] serialize(final String topic, final T data) { + return serializer.apply(data); + } + + @Override + public void close() { + // Do nothing + } + }, new Deserializer<T>() { + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Do nothing + } + + @Override + public T deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + return deserializer.apply(data); + } + + @Override + public void close() { + // Do nothing + } + + }); + + } + +} diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilder.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..78dda383a25b155c7ab1de91df9ddd6f320fbecc --- /dev/null +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilder.java @@ -0,0 +1,50 @@ +package rocks.theodolite.benchmarks.commons.kstreams; + +import java.util.Properties; +import java.util.function.Predicate; + + +/** + * Interface for a helper class for constructing and logging Kafka Stream {@link Properties}. + */ +public interface PropertiesBuilder { + + /** + * Set the provided configuration key to the provided value. + */ + <T> PropertiesBuilder set(String configKey, T value); + + /** + * Set the provided configuration key to the provided value if a given condition is evaluated to + * true. + */ + <T> PropertiesBuilder set(String configKey, T value, Predicate<T> condition); + + /** + * Build a {@link Properties} object with the option being set before. + */ + Properties build(); + + /** + * Interface representing an Kafka Stream {@link Properties} builder without the application id + * yet being set. + */ + interface WithoutApplicationId { + + /** + * Continue building Kafka Stream properties by specifying an application id. From now on, all + * configuration properties can be set directly. + */ + PropertiesBuilder applicationId(String applicationId); + + } + + /** + * Start building Kafka Stream properties by specifying a bootstrap server. Next, an application + * id has to be specified. + */ + static PropertiesBuilder.WithoutApplicationId bootstrapServers(final String bootstrapServers) { + return PropertiesBuilderImpl.bootstrapServers(bootstrapServers); + } + +} diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilderImpl.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..34459b8ea3d4c7d842e38c06cf12d7f0ffb9f14c --- /dev/null +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/PropertiesBuilderImpl.java @@ -0,0 +1,52 @@ +package rocks.theodolite.benchmarks.commons.kstreams; + +import java.util.Objects; +import java.util.Properties; +import java.util.function.Predicate; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation for {@link PropertiesBuilder} and + * {@link PropertiesBuilder.WithoutApplicationId}. + */ +class PropertiesBuilderImpl implements PropertiesBuilder, PropertiesBuilder.WithoutApplicationId { + + private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesBuilderImpl.class); + + private final Properties properties = new Properties(); + + @Override + public <T> PropertiesBuilderImpl set(final String configKey, final T value) { + this.properties.put(configKey, value); + LOGGER.info("Set Kafka Streams configuration parameter '{}' to '{}'.", configKey, value); + return this; + } + + @Override + public <T> PropertiesBuilderImpl set(final String configKey, final T value, + final Predicate<T> condition) { + if (condition.test(value)) { + this.set(configKey, value); + } + return this; + } + + @Override + public Properties build() { + return this.properties; + } + + @Override + public PropertiesBuilderImpl applicationId(final String applicationId) { + Objects.requireNonNull(applicationId, "Kafka Streams application ID cannot be null."); + return this.set(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + } + + protected static PropertiesBuilderImpl bootstrapServers(final String bootsrapservers) { + Objects.requireNonNull(bootsrapservers, "Kafka bootstrap servers cannot be null."); + return new PropertiesBuilderImpl().set(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootsrapservers); + } + +}