Skip to content
Snippets Groups Projects
Commit ebd69440 authored by Sören Henning's avatar Sören Henning
Browse files

Introduce RecordSerializer

parent e7013da1
No related branches found
No related tags found
1 merge request!225Add option to generate load via Google PubSub
Pipeline #6353 failed
package theodolite.commons.workloadgeneration;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
......@@ -8,17 +7,15 @@ import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Sends monitoring records to Kafka.
......@@ -27,6 +24,8 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public class PubSubRecordSender<T extends SpecificRecord> implements RecordSender<T> {
private static final int SHUTDOWN_TIMEOUT_SEC = 5;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
......@@ -35,6 +34,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
private final Function<T, Long> timestampAccessor;
private final Function<T, ByteBuffer> recordSerializer;
private final Producer<String, T> producer; // TODO remove
private final Publisher publisher;
......@@ -46,6 +47,7 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.timestampAccessor = builder.timestampAccessor;
this.recordSerializer = builder.recordSerializer;
final Properties properties = new Properties();
properties.putAll(builder.defaultProperties);
......@@ -66,7 +68,7 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
this.publisher = Publisher.newBuilder(this.topic).build();
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// e.printStackTrace();
throw new IllegalStateException(e);
}
}
......@@ -74,41 +76,35 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
public void write(final T record) {
// TODO fix this
ByteBuffer byteBuffer;
try {
byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer();
} catch (final IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
throw new IllegalStateException(e1);
}
final ByteBuffer byteBuffer = this.recordSerializer.apply(record);
// try {
// byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer();
// } catch (final IOException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// throw new IllegalStateException(e1);
// }
final ByteString data = ByteString.copyFrom(byteBuffer);
final PubsubMessage message = PubsubMessage.newBuilder()
.setOrderingKey(this.keyAccessor.apply(monitoringRecord))
.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(monitoringRecord)))
.setOrderingKey(this.keyAccessor.apply(record))
.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record)))
.setData(data)
.build();
final ApiFuture<String> future = this.publisher.publish(message);
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
this.publisher.publish(message);
LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, message);
}
public void terminate() {
this.publisher.shutdown();
try {
this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
this.producer.close();
}
......@@ -136,6 +132,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
private final String schemaRegistryUrl;
private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD
// TODO
private Function<T, ByteBuffer> recordSerializer = null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD
/**
......@@ -162,6 +160,11 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
return this;
}
public Builder<T> recordSerializer(final Function<T, ByteBuffer> recordSerializer) {
this.recordSerializer = recordSerializer;
return this;
}
public Builder<T> defaultProperties(final Properties defaultProperties) {
this.defaultProperties = defaultProperties;
return this;
......
package theodolite.commons.workloadgeneration;
import java.io.IOException;
import java.util.Properties;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -37,6 +38,15 @@ public final class TitanPubSubSenderFactory {
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.recordSerializer(r -> {
try {
return r.toByteBuffer();
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new IllegalStateException(e);
}
})
.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment