Skip to content
Snippets Groups Projects
Commit fd366614 authored by Sören Henning's avatar Sören Henning
Browse files

Split record generator and sender in loadgenerator

parent b9e21dea
No related branches found
No related tags found
1 merge request!203Split record generator and sender in load generator
Pipeline #5390 passed
Showing
with 139 additions and 60 deletions
package theodolite.commons.workloadgeneration;
/**
* Interface representing a message generator, which sends messages for given keys to some
* destination.
* Interface representing a record generator action consisting of generating a record and sending
* it.
*/
@FunctionalInterface
public interface MessageGenerator {
interface GeneratorAction {
void generate(final String key);
public static <T> MessageGenerator from(
final RecordGenerator<T> generator,
final RecordSender<T> sender) {
public static <T> GeneratorAction from(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
return key -> sender.send(generator.generate(key));
}
......
......@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers,
final String topic,
......@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
}
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
}
......@@ -91,12 +91,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)
.forConstantValue(VALUE_DEFAULT)));
TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
TitanKafkaSenderFactory.forKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)));
}
/**
......@@ -170,13 +169,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl,
kafkaProperties)
.forConstantValue(value)))
TitanRecordGeneratorFactory.forConstantValue(value),
TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)))
.withThreads(threads);
}
......
......@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration;
*/
public class LoadGeneratorConfig {
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private BeforeAction beforeAction = BeforeAction.doNothing();
private int threads = 1;
public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
this.messageGenerator = messageGenerator;
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
this.messageGenerator = GeneratorAction.from(generator, sender);
}
public LoadGeneratorConfig(
final MessageGenerator messageGenerator,
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender,
final int threads) {
this.messageGenerator = messageGenerator;
this(generator, sender);
this.threads = threads;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public void setThreads(final int threads) {
this.threads = threads;
}
......@@ -37,6 +31,13 @@ public class LoadGeneratorConfig {
this.beforeAction = beforeAction;
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
}
......@@ -8,25 +8,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be
* A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be
* started and stopped.
*/
public class LoadGeneratorExecution {
class LoadGeneratorExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
private final Random random = new Random();
private final WorkloadDefinition workloadDefinition;
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private final ScheduledExecutorService executor;
/**
* Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a
* {@link MessageGenerator}. Load is generated by the given number of threads.
* {@link GeneratorAction}. Load is generated by the given number of threads.
*/
public LoadGeneratorExecution(
final WorkloadDefinition workloadDefinition,
final MessageGenerator messageGenerator,
final GeneratorAction messageGenerator,
final int threads) {
this.workloadDefinition = workloadDefinition;
this.messageGenerator = messageGenerator;
......
......@@ -4,46 +4,33 @@ import java.util.Properties;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s
* and sends them via Kafka.
* A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s.
*/
public final class TitanMessageGeneratorFactory {
public final class TitanKafkaSenderFactory {
private final RecordSender<ActivePowerRecord> recordSender;
private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
this.recordSender = recordSender;
}
private TitanKafkaSenderFactory() {}
/**
* Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/
public MessageGenerator forConstantValue(final double value) {
return MessageGenerator.from(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
this.recordSender);
}
/**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
*/
public static TitanMessageGeneratorFactory withKafkaConfig(
public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl) {
return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
}
/**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/
public static TitanMessageGeneratorFactory withKafkaConfig(
public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl,
final Properties properties) {
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender
return KafkaRecordSender
.<ActivePowerRecord>builder(
bootstrapServers,
topic,
......@@ -51,7 +38,5 @@ public final class TitanMessageGeneratorFactory {
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
return new TitanMessageGeneratorFactory(kafkaRecordSender);
}
}
package theodolite.commons.workloadgeneration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s.
*/
public final class TitanRecordGeneratorFactory {
private TitanRecordGeneratorFactory() {}
/**
* Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
*/
public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) {
return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment