Skip to content
Snippets Groups Projects
Commit 7e34e121 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into refactor-beam-service

parents dd0aa284 f95b7b7a
No related branches found
No related tags found
1 merge request!249Align package structure among all benchmark implementations
Showing
with 450 additions and 144 deletions
......@@ -47,7 +47,7 @@ The prebuilt container images can be configured with the following environment v
| `PORT` | Port used for for coordination among load generator instances. | 5701 |
| `PORT_AUTO_INCREMENT` | If set to true and the specified PORT is already used, use the next higher one. Useful if multiple instances should run on the same host, without configuring each instance individually. | true |
| `CLUSTER_NAME_PREFIX` | Only required if unrelated load generators form a cluster. | theodolite-load-generation |
| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http`. | `kafka` |
| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http` and `pubsub`. | `kafka` |
| `KAFKA_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. See [Kafka producer config: `bootstrap.servers`](https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers) for more information. Only used if Kafka is set as `TARGET`. | `localhost:9092` |
| `KAFKA_INPUT_TOPIC` | Name of the Kafka topic, which should receive the generated messages. Only used if Kafka is set as `TARGET`. | input |
| `SCHEMA_REGISTRY_URL` | URL of the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry). | `http://localhost:8081` |
......@@ -55,6 +55,9 @@ The prebuilt container images can be configured with the following environment v
| `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) |
| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) |
| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | |
| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input |
| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | |
| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | |
| `NUM_SENSORS` | The amount of simulated sensors. | 10 |
| `PERIOD_MS` | The time in milliseconds between generating two messages for the same sensor. With our Theodolite benchmarks, we apply an [open workload model](https://www.usenix.org/legacy/event/nsdi06/tech/full_papers/schroeder/schroeder.pdf) in which new messages are generated at a fixed rate, without considering the think time of the target server nor the time required for generating a message. | 1000 |
| `VALUE` | The constant `valueInW` of an `ActivePowerRecord`. | 10 |
......@@ -64,10 +67,10 @@ Please note that there are some additional configuration options for benchmark [
## Creating a custom load generator
To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` object and call its `run` method:
To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` populated with a default configuration, adjust it as desired, and start it by calling its `run` method:
```java
LoadGenerator loadGenerator = new LoadGenerator()
LoadGenerator loadGenerator = new LoadGenerator.fromDefaults()
.setClusterConfig(clusterConfig)
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(key_prefix, numSensors),
......@@ -79,9 +82,8 @@ LoadGenerator loadGenerator = new LoadGenerator()
loadGenerator.run();
```
Alternatively, you can also start with a load generator populated with a default configuration or created from environment variables and then adjust the `LoadGenerator` as desired:
Alternatively, you can also start with a `LoadGenerator` created from environment variables and, optionally, adjust it as desired:
```java
LoadGenerator loadGeneratorFromDefaults = LoadGenerator.fromDefaults()
LoadGenerator loadGeneratorFromEnv = LoadGenerator.fromEnvironment();
LoadGenerator loadGenerator = LoadGenerator.fromEnvironment();
```
......@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a Beam microservice.
* Encapsulates the corresponding {@link PipelineOptions} and the beam Runner.
* Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* the beam Runner.
*/
public class AbstractBeamService {
......@@ -20,9 +20,7 @@ public class AbstractBeamService {
// Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName =
config.getString(ConfigurationKeys.APPLICATION_NAME);
private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
/**
* Creates AbstractBeamService with options.
......@@ -33,13 +31,13 @@ public class AbstractBeamService {
for (final String s : args) {
LOGGER.info("{}", s);
}
options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(applicationName);
LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString());
this.options = PipelineOptionsFactory.fromArgs(args).create();
this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
}
public Configuration getConfig() {
return config;
return this.config;
}
}
......@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
......@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
* @param <K> Type of the Key.
* @param <V> Type of the Value.
*/
public class KafkaGenericReader<K, V> extends
PTransform<PBegin, PCollection<KV<K, V>>> {
public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
......@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
public KafkaGenericReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<K>>
keyDeserializer,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer) {
super();
// Check if boostrap server and inputTopic are defined
......@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
this.reader =
KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
......
......@@ -21,6 +21,9 @@ dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required?
implementation platform('com.google.cloud:libraries-bom:24.2.0')
implementation 'com.google.protobuf:protobuf-java-util'
implementation 'com.google.cloud:google-cloud-pubsub'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
......@@ -39,6 +39,12 @@ public final class ConfigurationKeys {
public static final String HTTP_URL = "HTTP_URL";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
private ConfigurationKeys() {}
}
package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
class EnvVarLoadGeneratorFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class);
public LoadGenerator create(final LoadGenerator loadGeneratorTemplate) {
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(LoadGenerator.PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(LoadGenerator.VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(LoadGenerator.THREADS_DEFAULT)));
return loadGeneratorTemplate
.setClusterConfig(this.buildClusterConfig())
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGenerator.forConstantValue(value),
this.buildRecordSender()))
.withThreads(threads);
}
private ClusterConfig buildClusterConfig() {
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
} else {
clusterConfig = ClusterConfig.fromBootstrapServer(LoadGenerator.BOOTSTRAP_SERVER_DEFAULT);
LOGGER.info(
"Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS
LoadGenerator.BOOTSTRAP_SERVER_DEFAULT);
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
return clusterConfig;
}
private RecordSender<ActivePowerRecord> buildRecordSender() {
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
LoadGenerator.TARGET_DEFAULT.getValue()));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
LoadGenerator.KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
LoadGenerator.KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
LoadGenerator.SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
LOGGER.info(
"Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS
kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI url = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URL),
LoadGenerator.HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else if (target == LoadGeneratorTarget.PUBSUB) {
final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
final String inputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC),
LoadGenerator.PUBSUB_TOPIC_DEFAULT);
final String emulatorHost = System.getenv(ConfigurationKeys.PUBSUB_EMULATOR_HOST);
if (emulatorHost != null) { // NOPMD
LOGGER.info("Use Pub/Sub as target with emulator host {} and topic '{}'.",
emulatorHost,
inputTopic);
recordSender = TitanPubSubSenderFactory.forEmulatedPubSubConfig(emulatorHost, inputTopic);
} else if (project != null) { // NOPMD
LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic);
} else {
throw new IllegalStateException("Neither an emulator host nor a project was provided.");
}
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
return recordSender;
}
}
package theodolite.commons.workloadgeneration;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Theodolite load generator.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10;
private static final int THREADS_DEFAULT = 4;
private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String HTTP_URI_DEFAULT = "http://localhost:8080";
public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
public static final String SENSOR_PREFIX_DEFAULT = "s_";
public static final int NUMBER_OF_KEYS_DEFAULT = 10;
public static final int PERIOD_MS_DEFAULT = 1000;
public static final int VALUE_DEFAULT = 10;
public static final int THREADS_DEFAULT = 4;
public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA;
// Target: HTTP
public static final String HTTP_URI_DEFAULT = "http://localhost:8080";
// Target: Kafka
public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS
public static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
// Target: Pub/Sub
public static final String PUBSUB_TOPIC_DEFAULT = "input"; // NOCS
private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition;
......@@ -106,101 +102,7 @@ public final class LoadGenerator {
* Create a basic {@link LoadGenerator} from environment variables.
*/
public static LoadGenerator fromEnvironment() {
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
} else {
clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT);
LOGGER.info(
"Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS
BOOTSTRAP_SERVER_DEFAULT);
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
final LoadGeneratorTarget target = LoadGeneratorTarget.from(
Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET),
TARGET_DEFAULT.getValue()));
final RecordSender<ActivePowerRecord> recordSender; // NOPMD
if (target == LoadGeneratorTarget.KAFKA) {
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaProperties = new Properties();
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
recordSender = TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
LOGGER.info(
"Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS
kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI url = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URL),
HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PERIOD_MS),
Integer.toString(PERIOD_MS_DEFAULT)));
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT)));
return new LoadGenerator()
.setClusterConfig(clusterConfig)
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGenerator.forConstantValue(value),
recordSender))
.withThreads(threads);
return new EnvVarLoadGeneratorFactory().create(new LoadGenerator());
}
}
......@@ -4,7 +4,7 @@ import java.util.stream.Stream;
enum LoadGeneratorTarget {
KAFKA("kafka"), HTTP("http");
HTTP("http"), KAFKA("kafka"), PUBSUB("pubsub");
private final String value;
......
package theodolite.commons.workloadgeneration;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends monitoring records to Pub/Sub.
*
* @param <T> Record type to send
*/
public class PubSubRecordSender<T> implements RecordSender<T> {
private static final int SHUTDOWN_TIMEOUT_SEC = 5;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final Function<T, ByteBuffer> recordSerializer;
private final Function<T, Long> timestampAccessor;
private final Function<T, String> orderingKeyAccessor;
private final Publisher publisher;
private PubSubRecordSender(final Builder<T> builder) {
this.orderingKeyAccessor = builder.orderingKeyAccessor;
this.timestampAccessor = builder.timestampAccessor;
this.recordSerializer = builder.recordSerializer;
try {
this.publisher = builder.buildPublisher();
} catch (final IOException e) {
throw new IllegalStateException("Can not create Pub/Sub publisher.", e);
}
}
/**
* Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}.
*/
public void terminate() {
this.publisher.shutdown();
try {
this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
throw new IllegalStateException(e);
}
}
@Override
public void send(final T record) {
final ByteBuffer byteBuffer = this.recordSerializer.apply(record);
final ByteString data = ByteString.copyFrom(byteBuffer);
final PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(data);
if (this.orderingKeyAccessor != null) {
messageBuilder.setOrderingKey(this.orderingKeyAccessor.apply(record));
}
if (this.timestampAccessor != null) {
messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record)));
}
final PubsubMessage message = messageBuilder.build();
LOGGER.debug("Send message to PubSub topic {}: {}", this.publisher.getTopicName(), message);
final ApiFuture<String> publishResult = this.publisher.publish(message);
if (LOGGER.isDebugEnabled()) {
try {
LOGGER.debug("Publishing result is {}.", publishResult.get());
} catch (InterruptedException | ExecutionException e) {
LOGGER.warn("Can not get publishing result.", e);
}
}
}
/**
* Creates a {@link Builder} object for a {@link PubSubRecordSender}.
*
* @param project The project where to write.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
public static <T> Builder<T> builderForProject(
final String project,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
return new Builder<>(project, topic, recordSerializer);
}
/**
* Creates a {@link Builder} object for a {@link PubSubRecordSender}.
*
* @param emulatorHost Host of the emulator.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
public static <T> Builder<T> builderForEmulator(
final String emulatorHost,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
return new WithEmulatorBuilder<>(emulatorHost, topic, recordSerializer);
}
/**
* Builder class to build a new {@link PubSubRecordSender}.
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T> {
protected final TopicName topicName;
private final Function<T, ByteBuffer> recordSerializer; // NOPMD
private Function<T, Long> timestampAccessor = null; // NOPMD
private Function<T, String> orderingKeyAccessor = null; // NOPMD
/**
* Creates a Builder object for a {@link PubSubRecordSender}.
*
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
private Builder(
final String project,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
this.topicName = TopicName.of(project, topic);
this.recordSerializer = recordSerializer;
}
public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) {
this.timestampAccessor = timestampAccessor;
return this;
}
public Builder<T> orderingKeyAccessor(final Function<T, String> keyAccessor) {
this.orderingKeyAccessor = keyAccessor;
return this;
}
public PubSubRecordSender<T> build() {
return new PubSubRecordSender<>(this);
}
protected Publisher buildPublisher() throws IOException {
return Publisher.newBuilder(this.topicName).build();
}
}
private static class WithEmulatorBuilder<T> extends Builder<T> {
private static final String DUMMY_PROJECT = "dummy-project-id";
private final String emulatorHost;
/**
* Creates a Builder object for a {@link PubSubRecordSender}.
*
* @param emulatorHost host of the emulator.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
private WithEmulatorBuilder(
final String emulatorHost,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
super(DUMMY_PROJECT, topic, recordSerializer);
this.emulatorHost = emulatorHost;
}
@Override
protected Publisher buildPublisher() throws IOException {
final ManagedChannel channel = ManagedChannelBuilder
.forTarget(this.emulatorHost)
.usePlaintext()
.build();
final TransportChannelProvider channelProvider = FixedTransportChannelProvider
.create(GrpcTransportChannel.create(channel));
final CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
return Publisher.newBuilder(super.topicName)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
}
}
}
package theodolite.commons.workloadgeneration;
import java.io.IOException;
import java.nio.ByteBuffer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link PubSubRecordSender}s that sends Titan {@link ActivePowerRecord}s.
*/
public final class TitanPubSubSenderFactory {
private TitanPubSubSenderFactory() {}
/**
* Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given Pub/Sub
* configuration.
*/
public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(
final String project,
final String topic) {
return PubSubRecordSender
.builderForProject(project, topic, TitanPubSubSenderFactory::serialize)
// .orderingKeyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
}
/**
* Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub
* emulator configuration.
*/
public static PubSubRecordSender<ActivePowerRecord> forEmulatedPubSubConfig(
final String emulatorHost,
final String topic) {
return PubSubRecordSender
.builderForEmulator(emulatorHost, topic, TitanPubSubSenderFactory::serialize)
// .orderingKeyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
}
private static ByteBuffer serialize(final ActivePowerRecord record) {
try {
return record.toByteBuffer();
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}
application.name=theodolite-uc1-application
application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
......
application.name=theodolite-uc1-application
application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment