diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java index c5da400c67bf1929c5404f6a4d30efb663f27cce..02bf1401aae49f726546aec399856c887470cda3 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -1,6 +1,5 @@ package theodolite.commons.workloadgeneration; -import com.google.api.core.ApiFuture; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; @@ -8,17 +7,15 @@ import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; -import titan.ccp.model.records.ActivePowerRecord; /** * Sends monitoring records to Kafka. @@ -27,6 +24,8 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSender<T> { + private static final int SHUTDOWN_TIMEOUT_SEC = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); private final String topic; @@ -35,6 +34,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende private final Function<T, Long> timestampAccessor; + private final Function<T, ByteBuffer> recordSerializer; + private final Producer<String, T> producer; // TODO remove private final Publisher publisher; @@ -46,6 +47,7 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende this.topic = builder.topic; this.keyAccessor = builder.keyAccessor; this.timestampAccessor = builder.timestampAccessor; + this.recordSerializer = builder.recordSerializer; final Properties properties = new Properties(); properties.putAll(builder.defaultProperties); @@ -66,7 +68,7 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende this.publisher = Publisher.newBuilder(this.topic).build(); } catch (final IOException e) { // TODO Auto-generated catch block - e.printStackTrace(); + // e.printStackTrace(); throw new IllegalStateException(e); } } @@ -74,41 +76,35 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende /** * Write the passed monitoring record to Kafka. */ - public void write(final T monitoringRecord) { + public void write(final T record) { // TODO fix this - ByteBuffer byteBuffer; - try { - byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer(); - } catch (final IOException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - throw new IllegalStateException(e1); - } + final ByteBuffer byteBuffer = this.recordSerializer.apply(record); + // try { + // byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer(); + // } catch (final IOException e1) { + // // TODO Auto-generated catch block + // e1.printStackTrace(); + // throw new IllegalStateException(e1); + // } final ByteString data = ByteString.copyFrom(byteBuffer); final PubsubMessage message = PubsubMessage.newBuilder() - .setOrderingKey(this.keyAccessor.apply(monitoringRecord)) - .setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(monitoringRecord))) + .setOrderingKey(this.keyAccessor.apply(record)) + .setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))) .setData(data) .build(); - final ApiFuture<String> future = this.publisher.publish(message); - - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } + this.publisher.publish(message); + LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, message); } public void terminate() { + this.publisher.shutdown(); + try { + this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } this.producer.close(); } @@ -136,6 +132,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende private final String schemaRegistryUrl; private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD + // TODO + private Function<T, ByteBuffer> recordSerializer = null; // NOPMD private Properties defaultProperties = new Properties(); // NOPMD /** @@ -162,6 +160,11 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende return this; } + public Builder<T> recordSerializer(final Function<T, ByteBuffer> recordSerializer) { + this.recordSerializer = recordSerializer; + return this; + } + public Builder<T> defaultProperties(final Properties defaultProperties) { this.defaultProperties = defaultProperties; return this; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java index 1af4be779e909e607d9c3c2f1c40430b0194bb28..f5be8258b8852754a11b4aa36c4262d3e7d4daf2 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -1,5 +1,6 @@ package theodolite.commons.workloadgeneration; +import java.io.IOException; import java.util.Properties; import titan.ccp.model.records.ActivePowerRecord; @@ -37,6 +38,15 @@ public final class TitanPubSubSenderFactory { schemaRegistryUrl) .keyAccessor(r -> r.getIdentifier()) .timestampAccessor(r -> r.getTimestamp()) + .recordSerializer(r -> { + try { + return r.toByteBuffer(); + } catch (final IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new IllegalStateException(e); + } + }) .build(); } }