Skip to content
Snippets Groups Projects
Commit 34e01f90 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Sören Henning
Browse files

Move titan-kafka into theodolite commons and ks-commons

parent a89d8deb
Branches
Tags
1 merge request!266Move Titan classes to Theodolite
Showing
with 501 additions and 0 deletions
...@@ -30,6 +30,10 @@ dependencies { ...@@ -30,6 +30,10 @@ dependencies {
implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-api:1.7.25' implementation 'org.slf4j:slf4j-api:1.7.25'
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') {
// Kafka client is already included from Kafka Streams in version 2.4.0
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
// Use JUnit test framework // Use JUnit test framework
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.1' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.1'
......
package rocks.theodolite.commons.kafka.avro;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Collections;
import java.util.Map;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
/**
* Factory methods to create {@link Serde}s for Avro records using the Confluent Schema Registry.
*/
public final class SchemaRegistryAvroSerdeFactory {
private static final String SCHEMA_REGISTRY_URL_KEY =
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
private final Map<String, String> serdeConfig;
public SchemaRegistryAvroSerdeFactory(final String schemaRegistryUrl) {
this.serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_KEY, schemaRegistryUrl);
}
public <T extends SpecificRecord> Serde<T> forKeys() {
return this.build(true);
}
public <T extends SpecificRecord> Serde<T> forValues() {
return this.build(false);
}
private <T extends SpecificRecord> Serde<T> build(final boolean isKey) {
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(this.serdeConfig, isKey);
return avroSerde;
}
}
package rocks.theodolite.commons.kafka.simpleserdes;
import java.nio.charset.Charset;
/**
* Shared constants between {@link WriteBuffer} and {@link ReadBuffer}.
*/
public final class BufferConstants {
public static final Charset CHARSET = Charset.forName("UTF-8");
public static final byte BOOLEAN_FALSE = 0;
public static final byte BOOLEAN_TRUE = 1;
private BufferConstants() {}
}
package rocks.theodolite.commons.kafka.simpleserdes;
/**
* Deserializer to deserialize a {@link ReadBuffer} to an object.
*
* @param <T> the type of the deserialized object
*/
@FunctionalInterface
public interface BufferDeserializer<T> {
T deserialize(ReadBuffer buffer);
}
package rocks.theodolite.commons.kafka.simpleserdes;
/**
* Combine {@link BufferSerializer} and {@link BufferDeserializer} into one type. This allows
* implementing a serde in one class.
*
* @param <T> Type of the serializer and deserializer.
*/
public interface BufferSerde<T> extends BufferSerializer<T>, BufferDeserializer<T> {
}
package rocks.theodolite.commons.kafka.simpleserdes;
/**
* Serializer to serialize an object to a {@link WriteBuffer}.
*
* @param <T> the type of the serialized object
*/
@FunctionalInterface
public interface BufferSerializer<T> {
void serialize(WriteBuffer buffer, T data);
}
package rocks.theodolite.commons.kafka.simpleserdes;
import java.nio.ByteBuffer;
/**
* A buffer that is constructed from a byte array and data can be sequentially read to different
* kinds of data types. It is the counterpart to {@link WriteBuffer}.
*/
public class ReadBuffer {
private final ByteBuffer buffer;
/**
* Create this buffer by a byte array.
*/
public ReadBuffer(final byte[] bytes) {
this.buffer = ByteBuffer.wrap(bytes);
}
public byte getByte() {
return this.buffer.get();
}
/**
* Get a byte array.
*/
public byte[] getBytes() {
final int bytesLength = this.buffer.getInt();
final byte[] bytes = new byte[bytesLength];
this.buffer.get(bytes);
return bytes;
}
public boolean getBoolean() { // NOPMD
return this.getByte() == BufferConstants.BOOLEAN_TRUE;
}
public short getShort() { // NOPMD
return this.buffer.getShort();
}
public int getInt() {
return this.buffer.getInt();
}
public long getLong() {
return this.buffer.getLong();
}
public float getFloat() {
return this.buffer.getFloat();
}
public double getDouble() {
return this.buffer.getDouble();
}
public int getChar() {
return this.buffer.getChar();
}
public String getString() {
final byte[] bytes = this.getBytes();
return new String(bytes, BufferConstants.CHARSET);
}
}
package rocks.theodolite.commons.kafka.simpleserdes;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
/**
* Kafka {@link Deserializer} to be configured with a {@link BufferDeserializer} for simpler usage.
*
* @param <T> the type of the deserialized object
*/
public class SimpleDeserializer<T> implements Deserializer<T> {
private final BufferDeserializer<T> deserializer;
public SimpleDeserializer(final BufferDeserializer<T> deserializer) {
this.deserializer = deserializer;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// Do nothing
}
@Override
public T deserialize(final String topic, final byte[] bytes) {
if (bytes == null) {
return null;
}
final ReadBuffer buffer = new ReadBuffer(bytes);
return this.deserializer.deserialize(buffer);
}
@Override
public void close() {
// Do nothing
}
}
package rocks.theodolite.commons.kafka.simpleserdes;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
/**
* Factory class to create <i>Simple Serdes</i>. These are serdes created from a
* {@link BufferSerializer} and a {@link BufferDeserializer}.
*/
public final class SimpleSerdes {
private SimpleSerdes() {}
public static <T> Serde<T> create(final BufferSerde<T> serde) {
return Serdes.serdeFrom(new SimpleSerializer<>(serde), new SimpleDeserializer<>(serde));
}
public static <T> Serde<T> create(final BufferSerializer<T> serializer,
final BufferDeserializer<T> deserializer) {
return Serdes.serdeFrom(new SimpleSerializer<>(serializer),
new SimpleDeserializer<>(deserializer));
}
}
package rocks.theodolite.commons.kafka.simpleserdes;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
/**
* Kafka {@link Serializer} to be configured with a {@link BufferSerializer} for simpler usage.
*
* @param <T> the type of the serialized objects
*/
public class SimpleSerializer<T> implements Serializer<T> {
private final BufferSerializer<T> serializer;
public SimpleSerializer(final BufferSerializer<T> serializer) {
this.serializer = serializer;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// Do nothing
}
@Override
public byte[] serialize(final String topic, final T data) {
if (data == null) {
return null;
}
final WriteBuffer buffer = new WriteBuffer();
this.serializer.serialize(buffer, data);
return buffer.getBytes();
}
@Override
public void close() {
// Do nothing
}
}
package rocks.theodolite.commons.kafka.simpleserdes;
import java.nio.ByteBuffer;
/**
* A buffer to which different kinds of data types can be written and its content can be returned as
* bytes. Its counterpart is a {@link ReadBuffer} which allows to read the data in the order it is
* written.
*/
public class WriteBuffer {
private static final int BYTE_BUFFER_CAPACITY = 65536; // Is only virtual memory
private final ByteBuffer buffer = ByteBuffer.allocateDirect(BYTE_BUFFER_CAPACITY);
public void putByte(final byte value) {
this.buffer.put(value);
}
public void putBytes(final byte[] value) {
this.buffer.putInt(value.length);
this.buffer.put(value);
}
public void putBoolean(final boolean value) {
this.putByte(
value ? BufferConstants.BOOLEAN_TRUE : BufferConstants.BOOLEAN_FALSE);
}
public void putShort(final short value) { // NOPMD
this.buffer.putShort(value);
}
public void putInt(final int value) {
this.buffer.putInt(value);
}
public void putLong(final long value) {
this.buffer.putLong(value);
}
public void putFloat(final float value) {
this.buffer.putFloat(value);
}
public void putDouble(final double value) {
this.buffer.putDouble(value);
}
public void putChar(final char value) {
this.buffer.putChar(value);
}
public void putString(final String value) {
final byte[] bytes = value.getBytes(BufferConstants.CHARSET);
this.putBytes(bytes);
}
/**
* Get the content of this buffer as bytes.
*/
public byte[] getBytes() {
this.buffer.flip();
final byte[] bytes = new byte[this.buffer.remaining()];
this.buffer.get(bytes, 0, bytes.length);
return bytes;
}
}
package rocks.theodolite.benchmarks.commons.kstreams;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
/**
* Factory methods to create generic {@link Serde}s.
*/
public final class GenericSerde {
private GenericSerde() {}
/**
* Create a {@link Serde} using a serialize and a deserialize function.
*
* @param serializer function to convert a record into a byte array
* @param deserializer function to create a record from a byte array
*/
public static <T> Serde<T> from(final Function<T, byte[]> serializer,
final Function<byte[], T> deserializer) {
return org.apache.kafka.common.serialization.Serdes.serdeFrom(new Serializer<T>() {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// Do nothing
}
@Override
public byte[] serialize(final String topic, final T data) {
return serializer.apply(data);
}
@Override
public void close() {
// Do nothing
}
}, new Deserializer<T>() {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// Do nothing
}
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
return deserializer.apply(data);
}
@Override
public void close() {
// Do nothing
}
});
}
}
package rocks.theodolite.benchmarks.commons.kstreams;
import java.util.Properties;
import java.util.function.Predicate;
/**
* Interface for a helper class for constructing and logging Kafka Stream {@link Properties}.
*/
public interface PropertiesBuilder {
/**
* Set the provided configuration key to the provided value.
*/
<T> PropertiesBuilder set(String configKey, T value);
/**
* Set the provided configuration key to the provided value if a given condition is evaluated to
* true.
*/
<T> PropertiesBuilder set(String configKey, T value, Predicate<T> condition);
/**
* Build a {@link Properties} object with the option being set before.
*/
Properties build();
/**
* Interface representing an Kafka Stream {@link Properties} builder without the application id
* yet being set.
*/
interface WithoutApplicationId {
/**
* Continue building Kafka Stream properties by specifying an application id. From now on, all
* configuration properties can be set directly.
*/
PropertiesBuilder applicationId(String applicationId);
}
/**
* Start building Kafka Stream properties by specifying a bootstrap server. Next, an application
* id has to be specified.
*/
static PropertiesBuilder.WithoutApplicationId bootstrapServers(final String bootstrapServers) {
return PropertiesBuilderImpl.bootstrapServers(bootstrapServers);
}
}
package rocks.theodolite.benchmarks.commons.kstreams;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Predicate;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation for {@link PropertiesBuilder} and
* {@link PropertiesBuilder.WithoutApplicationId}.
*/
class PropertiesBuilderImpl implements PropertiesBuilder, PropertiesBuilder.WithoutApplicationId {
private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesBuilderImpl.class);
private final Properties properties = new Properties();
@Override
public <T> PropertiesBuilderImpl set(final String configKey, final T value) {
this.properties.put(configKey, value);
LOGGER.info("Set Kafka Streams configuration parameter '{}' to '{}'.", configKey, value);
return this;
}
@Override
public <T> PropertiesBuilderImpl set(final String configKey, final T value,
final Predicate<T> condition) {
if (condition.test(value)) {
this.set(configKey, value);
}
return this;
}
@Override
public Properties build() {
return this.properties;
}
@Override
public PropertiesBuilderImpl applicationId(final String applicationId) {
Objects.requireNonNull(applicationId, "Kafka Streams application ID cannot be null.");
return this.set(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
}
protected static PropertiesBuilderImpl bootstrapServers(final String bootsrapservers) {
Objects.requireNonNull(bootsrapservers, "Kafka bootstrap servers cannot be null.");
return new PropertiesBuilderImpl().set(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootsrapservers);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment