From fd36661485cd0ed8a92c4743e2680d97ae227443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 2 Dec 2021 17:45:52 +0100 Subject: [PATCH] Split record generator and sender in loadgenerator --- .../workloadgeneration/GeneratorAction.java | 18 ++++++ .../workloadgeneration/KafkaRecordSender.java | 54 +++++++++--------- .../workloadgeneration/LoadGenerator.java | 23 ++++---- .../LoadGeneratorConfig.java | 31 +++++----- .../LoadGeneratorExecution.java | 10 ++-- .../workloadgeneration/MessageGenerator.java | 18 ------ .../TitanKafkaSenderFactory.java | 42 ++++++++++++++ .../TitanMessageGeneratorFactory.java | 57 ------------------- .../TitanRecordGeneratorFactory.java | 21 +++++++ 9 files changed, 139 insertions(+), 135 deletions(-) create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java delete mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java delete mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java new file mode 100644 index 000000000..11a9cbf2d --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java @@ -0,0 +1,18 @@ +package theodolite.commons.workloadgeneration; + +/** + * Interface representing a record generator action consisting of generating a record and sending + * it. + */ +@FunctionalInterface +interface GeneratorAction { + + void generate(final String key); + + public static <T> GeneratorAction from( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + return key -> sender.send(generator.generate(key)); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index 6e4a43271..ded7c347c 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender avroSerdeFactory.<T>forKeys().serializer()); } + /** + * Write the passed monitoring record to Kafka. + */ + public void write(final T monitoringRecord) { + final ProducerRecord<String, T> record = + new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), + this.keyAccessor.apply(monitoringRecord), monitoringRecord); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + + public void terminate() { + this.producer.close(); + } + + @Override + public void send(final T message) { + this.write(message); + } + public static <T extends SpecificRecord> Builder<T> builder( final String bootstrapServers, final String topic, @@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender } } - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } - } - - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - this.write(message); - } - } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index a9a1ce65a..73f064d1c 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -91,12 +91,11 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanMessageGeneratorFactory - .withKafkaConfig( - KAFKA_BOOTSTRAP_SERVERS_DEFAULT, - KAFKA_TOPIC_DEFAULT, - SCHEMA_REGISTRY_URL_DEFAULT) - .forConstantValue(VALUE_DEFAULT))); + TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT), + TitanKafkaSenderFactory.forKafkaConfig( + KAFKA_BOOTSTRAP_SERVERS_DEFAULT, + KAFKA_TOPIC_DEFAULT, + SCHEMA_REGISTRY_URL_DEFAULT))); } /** @@ -170,13 +169,11 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanMessageGeneratorFactory - .withKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl, - kafkaProperties) - .forConstantValue(value))) + TitanRecordGeneratorFactory.forConstantValue(value), + TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl))) .withThreads(threads); } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java index 2e907d8e9..4b5fea3e4 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java @@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration; */ public class LoadGeneratorConfig { - private final MessageGenerator messageGenerator; + private final GeneratorAction messageGenerator; private BeforeAction beforeAction = BeforeAction.doNothing(); private int threads = 1; - public LoadGeneratorConfig(final MessageGenerator messageGenerator) { - this.messageGenerator = messageGenerator; + public <T> LoadGeneratorConfig( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + this.messageGenerator = GeneratorAction.from(generator, sender); } - public LoadGeneratorConfig( - final MessageGenerator messageGenerator, + public <T> LoadGeneratorConfig( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender, final int threads) { - this.messageGenerator = messageGenerator; + this(generator, sender); this.threads = threads; } - public LoadGeneratorExecution buildLoadGeneratorExecution( - final WorkloadDefinition workloadDefinition) { - return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); - } - - public BeforeAction getBeforeAction() { - return this.beforeAction; - } - public void setThreads(final int threads) { this.threads = threads; } @@ -37,6 +31,13 @@ public class LoadGeneratorConfig { this.beforeAction = beforeAction; } + public BeforeAction getBeforeAction() { + return this.beforeAction; + } + public LoadGeneratorExecution buildLoadGeneratorExecution( + final WorkloadDefinition workloadDefinition) { + return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); + } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java index 3934c3d34..e1a2a7e1b 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java @@ -8,25 +8,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be + * A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be * started and stopped. */ -public class LoadGeneratorExecution { +class LoadGeneratorExecution { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class); private final Random random = new Random(); private final WorkloadDefinition workloadDefinition; - private final MessageGenerator messageGenerator; + private final GeneratorAction messageGenerator; private final ScheduledExecutorService executor; /** * Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a - * {@link MessageGenerator}. Load is generated by the given number of threads. + * {@link GeneratorAction}. Load is generated by the given number of threads. */ public LoadGeneratorExecution( final WorkloadDefinition workloadDefinition, - final MessageGenerator messageGenerator, + final GeneratorAction messageGenerator, final int threads) { this.workloadDefinition = workloadDefinition; this.messageGenerator = messageGenerator; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java deleted file mode 100644 index c369f1655..000000000 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java +++ /dev/null @@ -1,18 +0,0 @@ -package theodolite.commons.workloadgeneration; - -/** - * Interface representing a message generator, which sends messages for given keys to some - * destination. - */ -@FunctionalInterface -public interface MessageGenerator { - - void generate(final String key); - - public static <T> MessageGenerator from( - final RecordGenerator<T> generator, - final RecordSender<T> sender) { - return key -> sender.send(generator.generate(key)); - } - -} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java new file mode 100644 index 000000000..0cdf8d91e --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java @@ -0,0 +1,42 @@ +package theodolite.commons.workloadgeneration; + +import java.util.Properties; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s. + */ +public final class TitanKafkaSenderFactory { + + private TitanKafkaSenderFactory() {} + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); + } + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl, + final Properties properties) { + return KafkaRecordSender + .<ActivePowerRecord>builder( + bootstrapServers, + topic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java deleted file mode 100644 index bd0b41d4e..000000000 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -package theodolite.commons.workloadgeneration; - -import java.util.Properties; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s - * and sends them via Kafka. - */ -public final class TitanMessageGeneratorFactory { - - private final RecordSender<ActivePowerRecord> recordSender; - - private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) { - this.recordSender = recordSender; - } - - /** - * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a - * constant value. - */ - public MessageGenerator forConstantValue(final double value) { - return MessageGenerator.from( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value), - this.recordSender); - } - - /** - * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. - */ - public static TitanMessageGeneratorFactory withKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl) { - return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); - } - - /** - * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. - */ - public static TitanMessageGeneratorFactory withKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl, - final Properties properties) { - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender - .<ActivePowerRecord>builder( - bootstrapServers, - topic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .build(); - return new TitanMessageGeneratorFactory(kafkaRecordSender); - } - -} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java new file mode 100644 index 000000000..4e1c10071 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java @@ -0,0 +1,21 @@ +package theodolite.commons.workloadgeneration; + +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s. + */ +public final class TitanRecordGeneratorFactory { + + + private TitanRecordGeneratorFactory() {} + + /** + * Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a + * constant value. + */ + public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) { + return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value); + } + +} -- GitLab