diff --git a/theodolite-benchmarks/load-generator-commons/build.gradle b/theodolite-benchmarks/load-generator-commons/build.gradle index 2d8f77b5154b5b788e0729da69122b443740ce75..927fd4d5ea59b27887ca09ba0ed1189e841d56ac 100644 --- a/theodolite-benchmarks/load-generator-commons/build.gradle +++ b/theodolite-benchmarks/load-generator-commons/build.gradle @@ -21,6 +21,8 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? + implementation platform('com.google.cloud:libraries-bom:24.2.0') + implementation 'com.google.cloud:google-cloud-pubsub' // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..c5da400c67bf1929c5404f6a4d30efb663f27cce --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -0,0 +1,175 @@ +package theodolite.commons.workloadgeneration; + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.util.function.Function; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Sends monitoring records to Kafka. + * + * @param <T> {@link SpecificRecord} to send + */ +public class PubSubRecordSender<T extends SpecificRecord> implements RecordSender<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<String, T> producer; // TODO remove + + private final Publisher publisher; + + /** + * Create a new {@link KafkaRecordSender}. + */ + private PubSubRecordSender(final Builder<T> builder) { + this.topic = builder.topic; + this.keyAccessor = builder.keyAccessor; + this.timestampAccessor = builder.timestampAccessor; + + final Properties properties = new Properties(); + properties.putAll(builder.defaultProperties); + properties.put("bootstrap.servers", builder.bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); + this.producer = new KafkaProducer<>( + properties, + new StringSerializer(), + avroSerdeFactory.<T>forKeys().serializer()); + + try { + this.publisher = Publisher.newBuilder(this.topic).build(); + } catch (final IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new IllegalStateException(e); + } + } + + /** + * Write the passed monitoring record to Kafka. + */ + public void write(final T monitoringRecord) { + + // TODO fix this + ByteBuffer byteBuffer; + try { + byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer(); + } catch (final IOException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + throw new IllegalStateException(e1); + } + final ByteString data = ByteString.copyFrom(byteBuffer); + + final PubsubMessage message = PubsubMessage.newBuilder() + .setOrderingKey(this.keyAccessor.apply(monitoringRecord)) + .setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(monitoringRecord))) + .setData(data) + .build(); + final ApiFuture<String> future = this.publisher.publish(message); + + final ProducerRecord<String, T> record = + new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), + this.keyAccessor.apply(monitoringRecord), monitoringRecord); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + + public void terminate() { + this.producer.close(); + } + + @Override + public void send(final T message) { + this.write(message); + } + + public static <T extends SpecificRecord> Builder<T> builder( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + } + + /** + * Builder class to build a new {@link KafkaRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T extends SpecificRecord> { + + private final String bootstrapServers; + private final String topic; + private final String schemaRegistryUrl; + private Function<T, String> keyAccessor = x -> ""; // NOPMD + private Function<T, Long> timestampAccessor = x -> null; // NOPMD + private Properties defaultProperties = new Properties(); // NOPMD + + /** + * Creates a Builder object for a {@link KafkaRecordSender}. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. + */ + private Builder(final String bootstrapServers, final String topic, + final String schemaRegistryUrl) { + this.bootstrapServers = bootstrapServers; + this.topic = topic; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { + this.keyAccessor = keyAccessor; + return this; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> defaultProperties(final Properties defaultProperties) { + this.defaultProperties = defaultProperties; + return this; + } + + public PubSubRecordSender<T> build() { + return new PubSubRecordSender<>(this); + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..1af4be779e909e607d9c3c2f1c40430b0194bb28 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -0,0 +1,42 @@ +package theodolite.commons.workloadgeneration; + +import java.util.Properties; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s. + */ +public final class TitanPubSubSenderFactory { + + private TitanPubSubSenderFactory() {} + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); + } + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl, + final Properties properties) { + return PubSubRecordSender + .<ActivePowerRecord>builder( + bootstrapServers, + topic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } +}