diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 7a60e271f04e396b2e0c69b1fcfee1d8a1ca8a7d..d4b61845d1c6206cd06ded8c77279024622b0558 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -39,6 +39,8 @@ public final class ConfigurationKeys { public static final String HTTP_URL = "HTTP_URL"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..d6f6707df5269c4f57eeef374e503a8ac102326c --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java @@ -0,0 +1,127 @@ +package theodolite.commons.workloadgeneration; + +import java.net.URI; +import java.time.Duration; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; + +class EnvVarLoadGeneratorFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class); + + public LoadGenerator create() { + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.NUM_SENSORS), + Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT))); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PERIOD_MS), + Integer.toString(LoadGenerator.PERIOD_MS_DEFAULT))); + final double value = Double.parseDouble(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.VALUE), + Integer.toString(LoadGenerator.VALUE_DEFAULT))); + final int threads = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.THREADS), + Integer.toString(LoadGenerator.THREADS_DEFAULT))); + + return LoadGenerator.fromDefaults() + .setClusterConfig(this.buildClusterConfig()) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), + Duration.ofMillis(periodMs))) + .setGeneratorConfig(new LoadGeneratorConfig( + TitanRecordGenerator.forConstantValue(value), + this.buildRecordSender())) + .withThreads(threads); + } + + private ClusterConfig buildClusterConfig() { + final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); + final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); + + ClusterConfig clusterConfig; + if (bootstrapServer != null) { // NOPMD + clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); + LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); + } else if (kubernetesDnsName != null) { // NOPMD + clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); + LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); + } else { + clusterConfig = ClusterConfig.fromBootstrapServer(LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + LOGGER.info( + "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS + LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + } + + final String port = System.getenv(ConfigurationKeys.PORT); + if (port != null) { + clusterConfig.setPort(Integer.parseInt(port)); + } + + final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); + if (portAutoIncrement != null) { + clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); + } + + final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); + if (clusterNamePrefix != null) { + clusterConfig.setClusterNamePrefix(portAutoIncrement); + } + + return clusterConfig; + } + + private RecordSender<ActivePowerRecord> buildRecordSender() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + LoadGenerator.TARGET_DEFAULT.getValue())); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + LoadGenerator.KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + LoadGenerator.KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + LoadGenerator.SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + LOGGER.info( + "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS + kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI url = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URL), + LoadGenerator.HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(url); + LOGGER.info("Use HTTP server as target with url '{}'.", url); + } else if (target == LoadGeneratorTarget.PUBSUB) { + final String pubSubInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC), + LoadGenerator.PUBSUB_TOPIC_DEFAULT); + recordSender = TitanPubSubSenderFactory.forPubSubConfig(pubSubInputTopic); + LOGGER.info("Use Pub/Sub as target with topic '{}'.", pubSubInputTopic); + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + return recordSender; + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 6453ef0bd3b6d5a3b5f7f2b77fa20da8f79cb35f..64a3e763c262e3300cefd9f7c81a9db5b251403b 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,32 +1,28 @@ package theodolite.commons.workloadgeneration; -import java.net.URI; import java.time.Duration; import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. */ public final class LoadGenerator { - private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - private static final String SENSOR_PREFIX_DEFAULT = "s_"; - private static final int NUMBER_OF_KEYS_DEFAULT = 10; - private static final int PERIOD_MS_DEFAULT = 1000; - private static final int VALUE_DEFAULT = 10; - private static final int THREADS_DEFAULT = 4; - private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_TOPIC_DEFAULT = "input"; - private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD - private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + public static final String SENSOR_PREFIX_DEFAULT = "s_"; + public static final int NUMBER_OF_KEYS_DEFAULT = 10; + public static final int PERIOD_MS_DEFAULT = 1000; + public static final int VALUE_DEFAULT = 10; + public static final int THREADS_DEFAULT = 4; + public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; + // Target: HTTP + public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + // Target: Kafka + public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; + public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS + public static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + // Target: Pub/Sub + public static final String PUBSUB_TOPIC_DEFAULT = "input"; // NOCS private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -106,101 +102,7 @@ public final class LoadGenerator { * Create a basic {@link LoadGenerator} from environment variables. */ public static LoadGenerator fromEnvironment() { - final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); - final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); - - ClusterConfig clusterConfig; - if (bootstrapServer != null) { // NOPMD - clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); - LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); - } else if (kubernetesDnsName != null) { // NOPMD - clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); - LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); - } else { - clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT); - LOGGER.info( - "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS - BOOTSTRAP_SERVER_DEFAULT); - } - - final String port = System.getenv(ConfigurationKeys.PORT); - if (port != null) { - clusterConfig.setPort(Integer.parseInt(port)); - } - - final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); - if (portAutoIncrement != null) { - clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); - } - - final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); - if (clusterNamePrefix != null) { - clusterConfig.setClusterNamePrefix(portAutoIncrement); - } - - final LoadGeneratorTarget target = LoadGeneratorTarget.from( - Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), - TARGET_DEFAULT.getValue())); - - final RecordSender<ActivePowerRecord> recordSender; // NOPMD - if (target == LoadGeneratorTarget.KAFKA) { - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); - recordSender = TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl); - LOGGER.info( - "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS - kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); - } else if (target == LoadGeneratorTarget.HTTP) { - final URI url = URI.create( - Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.HTTP_URL), - HTTP_URI_DEFAULT)); - recordSender = new HttpRecordSender<>(url); - LOGGER.info("Use HTTP server as target with url '{}'.", url); - } else { - // Should never happen - throw new IllegalStateException("Target " + target + " is not handled yet."); - } - - final int numSensors = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.NUM_SENSORS), - Integer.toString(NUMBER_OF_KEYS_DEFAULT))); - final int periodMs = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.PERIOD_MS), - Integer.toString(PERIOD_MS_DEFAULT))); - final double value = Double.parseDouble(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.VALUE), - Integer.toString(VALUE_DEFAULT))); - final int threads = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.THREADS), - Integer.toString(THREADS_DEFAULT))); - - return new LoadGenerator() - .setClusterConfig(clusterConfig) - .setLoadDefinition(new WorkloadDefinition( - new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), - Duration.ofMillis(periodMs))) - .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGenerator.forConstantValue(value), - recordSender)) - .withThreads(threads); + return new EnvVarLoadGeneratorFactory().create(); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java index 086e4de36301693c6873016122a47709b858a0d4..61ae6e86d1be63ca1f4ae3c362122235bb4662f0 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -4,7 +4,7 @@ import java.util.stream.Stream; enum LoadGeneratorTarget { - KAFKA("kafka"), HTTP("http"); + HTTP("http"), KAFKA("kafka"), PUBSUB("pubsub"); private final String value; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java index 02bf1401aae49f726546aec399856c887470cda3..f18fc5b7c62b85f5467e12fc82646ed60a01045a 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -6,23 +6,18 @@ import com.google.protobuf.util.Timestamps; import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * * @param <T> {@link SpecificRecord} to send */ -public class PubSubRecordSender<T extends SpecificRecord> implements RecordSender<T> { +public class PubSubRecordSender<T> implements RecordSender<T> { private static final int SHUTDOWN_TIMEOUT_SEC = 5; @@ -30,13 +25,11 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende private final String topic; - private final Function<T, String> keyAccessor; + private final Function<T, ByteBuffer> recordSerializer; private final Function<T, Long> timestampAccessor; - private final Function<T, ByteBuffer> recordSerializer; - - private final Producer<String, T> producer; // TODO remove + private final Function<T, String> orderingKeyAccessor; private final Publisher publisher; @@ -45,59 +38,20 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende */ private PubSubRecordSender(final Builder<T> builder) { this.topic = builder.topic; - this.keyAccessor = builder.keyAccessor; + this.orderingKeyAccessor = builder.orderingKeyAccessor; this.timestampAccessor = builder.timestampAccessor; this.recordSerializer = builder.recordSerializer; - final Properties properties = new Properties(); - properties.putAll(builder.defaultProperties); - properties.put("bootstrap.servers", builder.bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - final SchemaRegistryAvroSerdeFactory avroSerdeFactory = - new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); - this.producer = new KafkaProducer<>( - properties, - new StringSerializer(), - avroSerdeFactory.<T>forKeys().serializer()); - try { this.publisher = Publisher.newBuilder(this.topic).build(); } catch (final IOException e) { - // TODO Auto-generated catch block - // e.printStackTrace(); throw new IllegalStateException(e); } } /** - * Write the passed monitoring record to Kafka. + * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}. */ - public void write(final T record) { - - // TODO fix this - final ByteBuffer byteBuffer = this.recordSerializer.apply(record); - // try { - // byteBuffer = ((ActivePowerRecord) monitoringRecord).toByteBuffer(); - // } catch (final IOException e1) { - // // TODO Auto-generated catch block - // e1.printStackTrace(); - // throw new IllegalStateException(e1); - // } - final ByteString data = ByteString.copyFrom(byteBuffer); - - final PubsubMessage message = PubsubMessage.newBuilder() - .setOrderingKey(this.keyAccessor.apply(record)) - .setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))) - .setData(data) - .build(); - this.publisher.publish(message); - LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, message); - } - public void terminate() { this.publisher.shutdown(); try { @@ -105,54 +59,51 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende } catch (final InterruptedException e) { throw new IllegalStateException(e); } - this.producer.close(); } @Override - public void send(final T message) { - this.write(message); + public void send(final T record) { + final ByteBuffer byteBuffer = this.recordSerializer.apply(record); + final ByteString data = ByteString.copyFrom(byteBuffer); + + final PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(data); + if (this.orderingKeyAccessor != null) { + messageBuilder.setOrderingKey(this.orderingKeyAccessor.apply(record)); + } + if (this.timestampAccessor != null) { + messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))); + } + this.publisher.publish(messageBuilder.build()); + LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, messageBuilder); } - public static <T extends SpecificRecord> Builder<T> builder( - final String bootstrapServers, + public static <T> Builder<T> builder( final String topic, - final String schemaRegistryUrl) { - return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + final Function<T, ByteBuffer> recordSerializer) { + return new Builder<>(topic, recordSerializer); } /** - * Builder class to build a new {@link KafkaRecordSender}. + * Builder class to build a new {@link PubSubRecordSender}. * * @param <T> Type of the records that should later be send. */ - public static class Builder<T extends SpecificRecord> { + public static class Builder<T> { - private final String bootstrapServers; private final String topic; - private final String schemaRegistryUrl; - private Function<T, String> keyAccessor = x -> ""; // NOPMD - private Function<T, Long> timestampAccessor = x -> null; // NOPMD - // TODO - private Function<T, ByteBuffer> recordSerializer = null; // NOPMD - private Properties defaultProperties = new Properties(); // NOPMD + private final Function<T, ByteBuffer> recordSerializer; // NOPMD + private Function<T, Long> timestampAccessor = null; // NOPMD + private Function<T, String> orderingKeyAccessor = null; // NOPMD /** - * Creates a Builder object for a {@link KafkaRecordSender}. + * Creates a Builder object for a {@link PubSubRecordSender}. * - * @param bootstrapServers The Server to for accessing Kafka. * @param topic The topic where to write. - * @param schemaRegistryUrl URL to the schema registry for avro. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. */ - private Builder(final String bootstrapServers, final String topic, - final String schemaRegistryUrl) { - this.bootstrapServers = bootstrapServers; + private Builder(final String topic, final Function<T, ByteBuffer> recordSerializer) { this.topic = topic; - this.schemaRegistryUrl = schemaRegistryUrl; - } - - public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { - this.keyAccessor = keyAccessor; - return this; + this.recordSerializer = recordSerializer; } public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { @@ -160,13 +111,8 @@ public class PubSubRecordSender<T extends SpecificRecord> implements RecordSende return this; } - public Builder<T> recordSerializer(final Function<T, ByteBuffer> recordSerializer) { - this.recordSerializer = recordSerializer; - return this; - } - - public Builder<T> defaultProperties(final Properties defaultProperties) { - this.defaultProperties = defaultProperties; + public Builder<T> orderingKeyAccessor(final Function<T, String> keyAccessor) { + this.orderingKeyAccessor = keyAccessor; return this; } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java index f5be8258b8852754a11b4aa36c4262d3e7d4daf2..45638d47779d656b604b5b4b1c01f90c2c7e3513 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -1,52 +1,30 @@ package theodolite.commons.workloadgeneration; import java.io.IOException; -import java.util.Properties; import titan.ccp.model.records.ActivePowerRecord; /** - * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s. + * A factory for creating {@link PubSubRecordSender}s that sends Titan {@link ActivePowerRecord}s. */ public final class TitanPubSubSenderFactory { private TitanPubSubSenderFactory() {} /** - * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub * configuration. */ - public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl) { - return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); - } - - /** - * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka - * configuration. - */ - public static PubSubRecordSender<ActivePowerRecord> forKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl, - final Properties properties) { + public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(final String topic) { return PubSubRecordSender - .<ActivePowerRecord>builder( - bootstrapServers, - topic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .recordSerializer(r -> { + .<ActivePowerRecord>builder(topic, r -> { try { return r.toByteBuffer(); } catch (final IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); throw new IllegalStateException(e); } }) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) .build(); } }