diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md index 17845c42d47e94a5b696dee1d774890de8d6fff1..e92238e988436ded5444c4ce669dcc84e4e1a2b3 100644 --- a/docs/theodolite-benchmarks/load-generator.md +++ b/docs/theodolite-benchmarks/load-generator.md @@ -47,7 +47,7 @@ The prebuilt container images can be configured with the following environment v | `PORT` | Port used for for coordination among load generator instances. | 5701 | | `PORT_AUTO_INCREMENT` | If set to true and the specified PORT is already used, use the next higher one. Useful if multiple instances should run on the same host, without configuring each instance individually. | true | | `CLUSTER_NAME_PREFIX` | Only required if unrelated load generators form a cluster. | theodolite-load-generation | -| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http`. | `kafka` | +| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http` and `pubsub`. | `kafka` | | `KAFKA_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. See [Kafka producer config: `bootstrap.servers`](https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers) for more information. Only used if Kafka is set as `TARGET`. | `localhost:9092` | | `KAFKA_INPUT_TOPIC` | Name of the Kafka topic, which should receive the generated messages. Only used if Kafka is set as `TARGET`. | input | | `SCHEMA_REGISTRY_URL` | URL of the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry). | `http://localhost:8081` | @@ -55,6 +55,9 @@ The prebuilt container images can be configured with the following environment v | `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | +| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | +| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | +| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | | `NUM_SENSORS` | The amount of simulated sensors. | 10 | | `PERIOD_MS` | The time in milliseconds between generating two messages for the same sensor. With our Theodolite benchmarks, we apply an [open workload model](https://www.usenix.org/legacy/event/nsdi06/tech/full_papers/schroeder/schroeder.pdf) in which new messages are generated at a fixed rate, without considering the think time of the target server nor the time required for generating a message. | 1000 | | `VALUE` | The constant `valueInW` of an `ActivePowerRecord`. | 10 | @@ -64,10 +67,10 @@ Please note that there are some additional configuration options for benchmark [ ## Creating a custom load generator -To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` object and call its `run` method: +To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` populated with a default configuration, adjust it as desired, and start it by calling its `run` method: ```java -LoadGenerator loadGenerator = new LoadGenerator() +LoadGenerator loadGenerator = new LoadGenerator.fromDefaults() .setClusterConfig(clusterConfig) .setLoadDefinition(new WorkloadDefinition( new KeySpace(key_prefix, numSensors), @@ -79,9 +82,8 @@ LoadGenerator loadGenerator = new LoadGenerator() loadGenerator.run(); ``` -Alternatively, you can also start with a load generator populated with a default configuration or created from environment variables and then adjust the `LoadGenerator` as desired: +Alternatively, you can also start with a `LoadGenerator` created from environment variables and, optionally, adjust it as desired: ```java -LoadGenerator loadGeneratorFromDefaults = LoadGenerator.fromDefaults() -LoadGenerator loadGeneratorFromEnv = LoadGenerator.fromEnvironment(); +LoadGenerator loadGenerator = LoadGenerator.fromEnvironment(); ``` diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index 03c5ca1daa7ffab71a4d08c04f677d7412e3a2be..3e94fb4c878401183f45ff384e39dd6bc0291a27 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** - * Abstraction of a Beam microservice. - * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. + * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and + * the beam Runner. */ public class AbstractBeamService { @@ -20,26 +20,24 @@ public class AbstractBeamService { // Application Configurations private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = - config.getString(ConfigurationKeys.APPLICATION_NAME); - + private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); /** * Creates AbstractBeamService with options. */ - public AbstractBeamService(final String[] args) { //NOPMD + public AbstractBeamService(final String[] args) { // NOPMD super(); LOGGER.info("Pipeline options:"); for (final String s : args) { LOGGER.info("{}", s); } - options = PipelineOptionsFactory.fromArgs(args).create(); - options.setJobName(applicationName); - LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); + this.options = PipelineOptionsFactory.fromArgs(args).create(); + this.options.setJobName(this.applicationName); + LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); } public Configuration getConfig() { - return config; + return this.config; } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java index 83336b5a4c2451ef4bffefbd60ad9d52fccd9c17..e513c3a0e3dffcb9881f389af5ee9f05c52a2b63 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.Deserializer; /** * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. @@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; * @param <K> Type of the Key. * @param <V> Type of the Value. */ -public class KafkaGenericReader<K, V> extends - PTransform<PBegin, PCollection<KV<K, V>>> { +public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<K, V>>> reader; @@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - public KafkaGenericReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<K>> - keyDeserializer, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<V>> - valueDeserializer) { + public KafkaGenericReader( + final String bootstrapServer, + final String inputTopic, + final Map<String, Object> consumerConfig, + final Class<? extends Deserializer<K>> keyDeserializer, + final Class<? extends Deserializer<V>> valueDeserializer) { super(); // Check if boostrap server and inputTopic are defined @@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - reader = + this.reader = KafkaIO.<K, V>read() .withBootstrapServers(bootstrapServer) .withTopic(inputTopic) diff --git a/theodolite-benchmarks/load-generator-commons/build.gradle b/theodolite-benchmarks/load-generator-commons/build.gradle index 2d8f77b5154b5b788e0729da69122b443740ce75..62542eeabc1cccb35fa9f71d1929b72956a56999 100644 --- a/theodolite-benchmarks/load-generator-commons/build.gradle +++ b/theodolite-benchmarks/load-generator-commons/build.gradle @@ -21,6 +21,9 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? + implementation platform('com.google.cloud:libraries-bom:24.2.0') + implementation 'com.google.protobuf:protobuf-java-util' + implementation 'com.google.cloud:google-cloud-pubsub' // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 7a60e271f04e396b2e0c69b1fcfee1d8a1ca8a7d..e94a11425eebc8180504a8a4f4ff582116623574 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -39,6 +39,12 @@ public final class ConfigurationKeys { public static final String HTTP_URL = "HTTP_URL"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; + + public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; + + public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..2901b68d8f3e6fa90cccfe15e7992aca67653f94 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java @@ -0,0 +1,138 @@ +package theodolite.commons.workloadgeneration; + +import java.net.URI; +import java.time.Duration; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; + +class EnvVarLoadGeneratorFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class); + + public LoadGenerator create(final LoadGenerator loadGeneratorTemplate) { + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.NUM_SENSORS), + Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT))); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PERIOD_MS), + Integer.toString(LoadGenerator.PERIOD_MS_DEFAULT))); + final double value = Double.parseDouble(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.VALUE), + Integer.toString(LoadGenerator.VALUE_DEFAULT))); + final int threads = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.THREADS), + Integer.toString(LoadGenerator.THREADS_DEFAULT))); + + return loadGeneratorTemplate + .setClusterConfig(this.buildClusterConfig()) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), + Duration.ofMillis(periodMs))) + .setGeneratorConfig(new LoadGeneratorConfig( + TitanRecordGenerator.forConstantValue(value), + this.buildRecordSender())) + .withThreads(threads); + } + + private ClusterConfig buildClusterConfig() { + final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); + final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); + + ClusterConfig clusterConfig; + if (bootstrapServer != null) { // NOPMD + clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); + LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); + } else if (kubernetesDnsName != null) { // NOPMD + clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); + LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); + } else { + clusterConfig = ClusterConfig.fromBootstrapServer(LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + LOGGER.info( + "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS + LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + } + + final String port = System.getenv(ConfigurationKeys.PORT); + if (port != null) { + clusterConfig.setPort(Integer.parseInt(port)); + } + + final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); + if (portAutoIncrement != null) { + clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); + } + + final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); + if (clusterNamePrefix != null) { + clusterConfig.setClusterNamePrefix(portAutoIncrement); + } + + return clusterConfig; + } + + private RecordSender<ActivePowerRecord> buildRecordSender() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + LoadGenerator.TARGET_DEFAULT.getValue())); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + LoadGenerator.KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + LoadGenerator.KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + LoadGenerator.SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + LOGGER.info( + "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS + kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI url = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URL), + LoadGenerator.HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(url); + LOGGER.info("Use HTTP server as target with url '{}'.", url); + } else if (target == LoadGeneratorTarget.PUBSUB) { + final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); + final String inputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC), + LoadGenerator.PUBSUB_TOPIC_DEFAULT); + final String emulatorHost = System.getenv(ConfigurationKeys.PUBSUB_EMULATOR_HOST); + if (emulatorHost != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with emulator host {} and topic '{}'.", + emulatorHost, + inputTopic); + recordSender = TitanPubSubSenderFactory.forEmulatedPubSubConfig(emulatorHost, inputTopic); + } else if (project != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); + recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); + } else { + throw new IllegalStateException("Neither an emulator host nor a project was provided."); + } + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + return recordSender; + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 6453ef0bd3b6d5a3b5f7f2b77fa20da8f79cb35f..1f02a0e0c910f7d1821c92a0fa71f6d08dbbf6ad 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,32 +1,28 @@ package theodolite.commons.workloadgeneration; -import java.net.URI; import java.time.Duration; import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. */ public final class LoadGenerator { - private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - private static final String SENSOR_PREFIX_DEFAULT = "s_"; - private static final int NUMBER_OF_KEYS_DEFAULT = 10; - private static final int PERIOD_MS_DEFAULT = 1000; - private static final int VALUE_DEFAULT = 10; - private static final int THREADS_DEFAULT = 4; - private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_TOPIC_DEFAULT = "input"; - private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD - private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + public static final String SENSOR_PREFIX_DEFAULT = "s_"; + public static final int NUMBER_OF_KEYS_DEFAULT = 10; + public static final int PERIOD_MS_DEFAULT = 1000; + public static final int VALUE_DEFAULT = 10; + public static final int THREADS_DEFAULT = 4; + public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; + // Target: HTTP + public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + // Target: Kafka + public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; + public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS + public static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + // Target: Pub/Sub + public static final String PUBSUB_TOPIC_DEFAULT = "input"; // NOCS private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -106,101 +102,7 @@ public final class LoadGenerator { * Create a basic {@link LoadGenerator} from environment variables. */ public static LoadGenerator fromEnvironment() { - final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); - final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); - - ClusterConfig clusterConfig; - if (bootstrapServer != null) { // NOPMD - clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); - LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); - } else if (kubernetesDnsName != null) { // NOPMD - clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); - LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); - } else { - clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT); - LOGGER.info( - "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS - BOOTSTRAP_SERVER_DEFAULT); - } - - final String port = System.getenv(ConfigurationKeys.PORT); - if (port != null) { - clusterConfig.setPort(Integer.parseInt(port)); - } - - final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); - if (portAutoIncrement != null) { - clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); - } - - final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); - if (clusterNamePrefix != null) { - clusterConfig.setClusterNamePrefix(portAutoIncrement); - } - - final LoadGeneratorTarget target = LoadGeneratorTarget.from( - Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), - TARGET_DEFAULT.getValue())); - - final RecordSender<ActivePowerRecord> recordSender; // NOPMD - if (target == LoadGeneratorTarget.KAFKA) { - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); - recordSender = TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl); - LOGGER.info( - "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS - kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); - } else if (target == LoadGeneratorTarget.HTTP) { - final URI url = URI.create( - Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.HTTP_URL), - HTTP_URI_DEFAULT)); - recordSender = new HttpRecordSender<>(url); - LOGGER.info("Use HTTP server as target with url '{}'.", url); - } else { - // Should never happen - throw new IllegalStateException("Target " + target + " is not handled yet."); - } - - final int numSensors = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.NUM_SENSORS), - Integer.toString(NUMBER_OF_KEYS_DEFAULT))); - final int periodMs = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.PERIOD_MS), - Integer.toString(PERIOD_MS_DEFAULT))); - final double value = Double.parseDouble(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.VALUE), - Integer.toString(VALUE_DEFAULT))); - final int threads = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.THREADS), - Integer.toString(THREADS_DEFAULT))); - - return new LoadGenerator() - .setClusterConfig(clusterConfig) - .setLoadDefinition(new WorkloadDefinition( - new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), - Duration.ofMillis(periodMs))) - .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGenerator.forConstantValue(value), - recordSender)) - .withThreads(threads); + return new EnvVarLoadGeneratorFactory().create(new LoadGenerator()); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java index 086e4de36301693c6873016122a47709b858a0d4..61ae6e86d1be63ca1f4ae3c362122235bb4662f0 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -4,7 +4,7 @@ import java.util.stream.Stream; enum LoadGeneratorTarget { - KAFKA("kafka"), HTTP("http"); + HTTP("http"), KAFKA("kafka"), PUBSUB("pubsub"); private final String value; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..ccbeb729236307b26538ee12b1a0e2373a7f0378 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -0,0 +1,205 @@ +package theodolite.commons.workloadgeneration; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends monitoring records to Pub/Sub. + * + * @param <T> Record type to send + */ +public class PubSubRecordSender<T> implements RecordSender<T> { + + private static final int SHUTDOWN_TIMEOUT_SEC = 5; + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); + + private final Function<T, ByteBuffer> recordSerializer; + + private final Function<T, Long> timestampAccessor; + + private final Function<T, String> orderingKeyAccessor; + + private final Publisher publisher; + + private PubSubRecordSender(final Builder<T> builder) { + this.orderingKeyAccessor = builder.orderingKeyAccessor; + this.timestampAccessor = builder.timestampAccessor; + this.recordSerializer = builder.recordSerializer; + + try { + this.publisher = builder.buildPublisher(); + } catch (final IOException e) { + throw new IllegalStateException("Can not create Pub/Sub publisher.", e); + } + } + + /** + * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}. + */ + public void terminate() { + this.publisher.shutdown(); + try { + this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void send(final T record) { + final ByteBuffer byteBuffer = this.recordSerializer.apply(record); + final ByteString data = ByteString.copyFrom(byteBuffer); + + final PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(data); + if (this.orderingKeyAccessor != null) { + messageBuilder.setOrderingKey(this.orderingKeyAccessor.apply(record)); + } + if (this.timestampAccessor != null) { + messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))); + } + final PubsubMessage message = messageBuilder.build(); + LOGGER.debug("Send message to PubSub topic {}: {}", this.publisher.getTopicName(), message); + final ApiFuture<String> publishResult = this.publisher.publish(message); + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("Publishing result is {}.", publishResult.get()); + } catch (InterruptedException | ExecutionException e) { + LOGGER.warn("Can not get publishing result.", e); + } + } + } + + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param project The project where to write. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForProject( + final String project, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + return new Builder<>(project, topic, recordSerializer); + } + + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param emulatorHost Host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForEmulator( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + return new WithEmulatorBuilder<>(emulatorHost, topic, recordSerializer); + } + + /** + * Builder class to build a new {@link PubSubRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T> { + + protected final TopicName topicName; + private final Function<T, ByteBuffer> recordSerializer; // NOPMD + private Function<T, Long> timestampAccessor = null; // NOPMD + private Function<T, String> orderingKeyAccessor = null; // NOPMD + + /** + * Creates a Builder object for a {@link PubSubRecordSender}. + * + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + private Builder( + final String project, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + this.topicName = TopicName.of(project, topic); + this.recordSerializer = recordSerializer; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> orderingKeyAccessor(final Function<T, String> keyAccessor) { + this.orderingKeyAccessor = keyAccessor; + return this; + } + + public PubSubRecordSender<T> build() { + return new PubSubRecordSender<>(this); + } + + protected Publisher buildPublisher() throws IOException { + return Publisher.newBuilder(this.topicName).build(); + } + + } + + private static class WithEmulatorBuilder<T> extends Builder<T> { + + private static final String DUMMY_PROJECT = "dummy-project-id"; + + private final String emulatorHost; + + /** + * Creates a Builder object for a {@link PubSubRecordSender}. + * + * @param emulatorHost host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + private WithEmulatorBuilder( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + super(DUMMY_PROJECT, topic, recordSerializer); + this.emulatorHost = emulatorHost; + } + + @Override + protected Publisher buildPublisher() throws IOException { + final ManagedChannel channel = ManagedChannelBuilder + .forTarget(this.emulatorHost) + .usePlaintext() + .build(); + + final TransportChannelProvider channelProvider = FixedTransportChannelProvider + .create(GrpcTransportChannel.create(channel)); + final CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + + return Publisher.newBuilder(super.topicName) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + } + + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..5a18376ab4c2fcbf896f847c0ed34af69c5eb507 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -0,0 +1,50 @@ +package theodolite.commons.workloadgeneration; + +import java.io.IOException; +import java.nio.ByteBuffer; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link PubSubRecordSender}s that sends Titan {@link ActivePowerRecord}s. + */ +public final class TitanPubSubSenderFactory { + + private TitanPubSubSenderFactory() {} + + /** + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given Pub/Sub + * configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig( + final String project, + final String topic) { + return PubSubRecordSender + .builderForProject(project, topic, TitanPubSubSenderFactory::serialize) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } + + /** + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub + * emulator configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forEmulatedPubSubConfig( + final String emulatorHost, + final String topic) { + return PubSubRecordSender + .builderForEmulator(emulatorHost, topic, TitanPubSubSenderFactory::serialize) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } + + private static ByteBuffer serialize(final ActivePowerRecord record) { + try { + return record.toByteBuffer(); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest