Skip to content
Snippets Groups Projects
Commit f0cf85e8 authored by Sören Henning's avatar Sören Henning
Browse files

Refactor code to support pubsub

parent 4b111bfc
No related branches found
No related tags found
1 merge request!225Add option to generate load via Google PubSub
Pipeline #6652 passed
......@@ -39,6 +39,8 @@ public final class ConfigurationKeys {
public static final String HTTP_URL = "HTTP_URL";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
private ConfigurationKeys() {}
}
package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
class EnvVarLoadGeneratorFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class);
public LoadGenerator create() {
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(LoadGenerator.PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(LoadGenerator.VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(LoadGenerator.THREADS_DEFAULT)));
return LoadGenerator.fromDefaults()
.setClusterConfig(this.buildClusterConfig())
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGenerator.forConstantValue(value),
this.buildRecordSender()))
.withThreads(threads);
}
private ClusterConfig buildClusterConfig() {
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
} else {
clusterConfig = ClusterConfig.fromBootstrapServer(LoadGenerator.BOOTSTRAP_SERVER_DEFAULT);
LOGGER.info(
"Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS
LoadGenerator.BOOTSTRAP_SERVER_DEFAULT);
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
return clusterConfig;
}
private RecordSender<ActivePowerRecord> buildRecordSender() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
LoadGenerator.TARGET_DEFAULT.getValue()));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
LoadGenerator.KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
LoadGenerator.KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
LoadGenerator.SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
LOGGER.info(
"Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS
kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI url = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URL),
LoadGenerator.HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else if (target == LoadGeneratorTarget.PUBSUB) {
final String pubSubInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC),
LoadGenerator.PUBSUB_TOPIC_DEFAULT);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(pubSubInputTopic);
LOGGER.info("Use Pub/Sub as target with topic '{}'.", pubSubInputTopic);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
return recordSender;
}
}
package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Theodolite load generator.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10;
private static final int THREADS_DEFAULT = 4;
private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String HTTP_URI_DEFAULT = "http://localhost:8080";
public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
public static final String SENSOR_PREFIX_DEFAULT = "s_";
public static final int NUMBER_OF_KEYS_DEFAULT = 10;
public static final int PERIOD_MS_DEFAULT = 1000;
public static final int VALUE_DEFAULT = 10;
public static final int THREADS_DEFAULT = 4;
public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
// Target: HTTP
public static final String HTTP_URI_DEFAULT = "http://localhost:8080";
// Target: Kafka
public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS
public static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
// Target: Pub/Sub
public static final String PUBSUB_TOPIC_DEFAULT = "input"; // NOCS
private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition;
......@@ -106,101 +102,7 @@ public final class LoadGenerator {
* Create a basic {@link LoadGenerator} from environment variables.
*/
public static LoadGenerator fromEnvironment() {
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
} else {
clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT);
LOGGER.info(
"Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS
BOOTSTRAP_SERVER_DEFAULT);
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
TARGET_DEFAULT.getValue()));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
LOGGER.info(
"Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS
kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI url = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URL),
HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT)));
return new LoadGenerator()
.setClusterConfig(clusterConfig)
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGenerator.forConstantValue(value),
recordSender))
.withThreads(threads);
return new EnvVarLoadGeneratorFactory().create();
}
}
......@@ -4,7 +4,7 @@ import java.util.stream.Stream;
enum LoadGeneratorTarget {
KAFKA("kafka"), HTTP("http");
HTTP("http"), KAFKA("kafka"), PUBSUB("pubsub");
private final String value;
......
......@@ -6,23 +6,18 @@ import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link SpecificRecord} to send
*/
public class PubSubRecordSender<T extends SpecificRecord> implements RecordSender<T> {
public class PubSubRecordSender<T> implements RecordSender<T> {
private static final int SHUTDOWN_TIMEOUT_SEC = 5;
......@@ -30,13 +25,11 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, ByteBuffer> recordSerializer;
private final Function<T, Long> timestampAccessor;
private final Function<T, ByteBuffer> recordSerializer;
private final Producer<String, T> producer; // TODO remove
private final Function<T, String> orderingKeyAccessor;
private final Publisher publisher;
......@@ -45,59 +38,20 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
*/
private PubSubRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.orderingKeyAccessor = builder.orderingKeyAccessor;
this.timestampAccessor = builder.timestampAccessor;
this.recordSerializer = builder.recordSerializer;
final Properties properties = new Properties();
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
avroSerdeFactory.<T>forKeys().serializer());
try {
this.publisher = Publisher.newBuilder(this.topic).build();
} catch (final IOException e) {
// TODO Auto-generated catch block
// e.printStackTrace();
throw new IllegalStateException(e);
}
}
/**
* Write the passed monitoring record to Kafka.
* Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}.
*/
public void write(final T record) {
// TODO fix this
final ByteBuffer byteBuffer = this.recordSerializer.apply(record);
// try {
// byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer();
// } catch (final IOException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// throw new IllegalStateException(e1);
// }
final ByteString data = ByteString.copyFrom(byteBuffer);
final PubsubMessage message = PubsubMessage.newBuilder()
.setOrderingKey(this.keyAccessor.apply(record))
.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record)))
.setData(data)
.build();
this.publisher.publish(message);
LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, message);
}
public void terminate() {
this.publisher.shutdown();
try {
......@@ -105,54 +59,51 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
public void send(final T record) {
final ByteBuffer byteBuffer = this.recordSerializer.apply(record);
final ByteString data = ByteString.copyFrom(byteBuffer);
final PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(data);
if (this.orderingKeyAccessor != null) {
messageBuilder.setOrderingKey(this.orderingKeyAccessor.apply(record));
}
if (this.timestampAccessor != null) {
messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record)));
}
this.publisher.publish(messageBuilder.build());
LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, messageBuilder);
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers,
public static <T> Builder<T> builder(
final String topic,
final String schemaRegistryUrl) {
return new Builder<>(bootstrapServers, topic, schemaRegistryUrl);
final Function<T, ByteBuffer> recordSerializer) {
return new Builder<>(topic, recordSerializer);
}
/**
* Builder class to build a new {@link KafkaRecordSender}.
* Builder class to build a new {@link PubSubRecordSender}.
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T extends SpecificRecord> {
public static class Builder<T> {
private final String bootstrapServers;
private final String topic;
private final String schemaRegistryUrl;
private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD
// TODO
private Function<T, ByteBuffer> recordSerializer = null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD
private final Function<T, ByteBuffer> recordSerializer; // NOPMD
private Function<T, Long> timestampAccessor = null; // NOPMD
private Function<T, String> orderingKeyAccessor = null; // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
* Creates a Builder object for a {@link PubSubRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
private Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
private Builder(final String topic, final Function<T, ByteBuffer> recordSerializer) {
this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl;
}
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
this.keyAccessor = keyAccessor;
return this;
this.recordSerializer = recordSerializer;
}
public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) {
......@@ -160,13 +111,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende
return this;
}
public Builder<T> recordSerializer(final Function<T, ByteBuffer> recordSerializer) {
this.recordSerializer = recordSerializer;
return this;
}
public Builder<T> defaultProperties(final Properties defaultProperties) {
this.defaultProperties = defaultProperties;
public Builder<T> orderingKeyAccessor(final Function<T, String> keyAccessor) {
this.orderingKeyAccessor = keyAccessor;
return this;
}
......
package theodolite.commons.workloadgeneration;
import java.io.IOException;
import java.util.Properties;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s.
* A factory for creating {@link PubSubRecordSender}s that sends Titan {@link ActivePowerRecord}s.
*/
public final class TitanPubSubSenderFactory {
private TitanPubSubSenderFactory() {}
/**
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub
* configuration.
*/
public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl) {
return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
}
/**
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/
public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl,
final Properties properties) {
public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(final String topic) {
return PubSubRecordSender
.<ActivePowerRecord>builder(
bootstrapServers,
topic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.recordSerializer(r -> {
.<ActivePowerRecord>builder(topic, r -> {
try {
return r.toByteBuffer();
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new IllegalStateException(e);
}
})
// .orderingKeyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment