Skip to content
Snippets Groups Projects
Commit 277c5d0f authored by Sören Henning's avatar Sören Henning
Browse files

Add PubSub Lite load generator

parent 3eac7ae1
Branches
Tags
1 merge request!253Draft: Add PubSub Lite load generator
Pipeline #6906 passed
......@@ -24,6 +24,8 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:24.2.0')
implementation 'com.google.protobuf:protobuf-java-util'
implementation 'com.google.cloud:google-cloud-pubsub'
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.11' // TODO version required?
implementation 'com.google.cloud:pubsublite-kafka:0.6.11' // TODO version required?
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
package rocks.theodolite.benchmarks.loadgenerator;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
/**
* Sends monitoring records to Pub/Sub Lite. This class uses a Kafka producer internally.
*
* @param <T> Record type to send.
*/
public class PubSubLiteRecordSender<T> extends KafkaRecordSenderImpl<T, byte[], byte[]> {
/**
* Create a new {@link PubSubLiteRecordSender}.
*/
public PubSubLiteRecordSender(
final TopicPath topicPath,
final Serializer<T> serializer,
final Function<T, String> keyAccessor,
final Function<T, Long> timestampAccessor) {
super(
ProducerSettings.newBuilder().setTopicPath(topicPath).build().instantiate(),
new PubSubLiteRecordFactory<T>(serializer),
topicPath.toString(),
keyAccessor,
timestampAccessor);
}
/**
* Create a {@link TopicPath} from the provided Google Cloud settings.
*/
public static TopicPath buildTopicPath(final String cloudRegion, final char zoneId,
final long projectNumber, final String topicId) {
return TopicPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProject(ProjectNumber.of(projectNumber))
.setName(TopicName.of(topicId))
.build();
}
private static class PubSubLiteRecordFactory<T> implements KafkaRecordFactory<T, byte[], byte[]> {
private final Serializer<T> serializer;
public PubSubLiteRecordFactory(final Serializer<T> serializer) {
this.serializer = serializer;
}
@Override
public ProducerRecord<byte[], byte[]> create(final String topic, final String key,
final T value, final long timestamp) {
return new ProducerRecord<>(
topic,
null,
timestamp,
key.getBytes(),
this.serializer.serialize(topic, value));
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment