Skip to content
Snippets Groups Projects

Draft: Add PubSub Lite load generator

Open Sören Henning requested to merge pubsublite-load-generator into main
2 files
+ 71
0
Compare changes
  • Side-by-side
  • Inline
Files
2
package rocks.theodolite.benchmarks.loadgenerator;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
/**
* Sends monitoring records to Pub/Sub Lite. This class uses a Kafka producer internally.
*
* @param <T> Record type to send.
*/
public class PubSubLiteRecordSender<T> extends KafkaRecordSenderImpl<T, byte[], byte[]> {
/**
* Create a new {@link PubSubLiteRecordSender}.
*/
public PubSubLiteRecordSender(
final TopicPath topicPath,
final Serializer<T> serializer,
final Function<T, String> keyAccessor,
final Function<T, Long> timestampAccessor) {
super(
ProducerSettings.newBuilder().setTopicPath(topicPath).build().instantiate(),
new PubSubLiteRecordFactory<T>(serializer),
topicPath.toString(),
keyAccessor,
timestampAccessor);
}
/**
* Create a {@link TopicPath} from the provided Google Cloud settings.
*/
public static TopicPath buildTopicPath(final String cloudRegion, final char zoneId,
final long projectNumber, final String topicId) {
return TopicPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProject(ProjectNumber.of(projectNumber))
.setName(TopicName.of(topicId))
.build();
}
private static class PubSubLiteRecordFactory<T> implements KafkaRecordFactory<T, byte[], byte[]> {
private final Serializer<T> serializer;
public PubSubLiteRecordFactory(final Serializer<T> serializer) {
this.serializer = serializer;
}
@Override
public ProducerRecord<byte[], byte[]> create(final String topic, final String key,
final T value, final long timestamp) {
return new ProducerRecord<>(
topic,
null,
timestamp,
key.getBytes(),
this.serializer.serialize(topic, value));
}
}
}
Loading