Skip to content
Snippets Groups Projects
Commit f175b6d3 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite

parents fef26208 9b9fb7ee
No related branches found
No related tags found
No related merge requests found
Pipeline #5402 passed
Showing
with 139 additions and 60 deletions
package theodolite.commons.workloadgeneration;
/**
* Interface representing a message generator, which sends messages for given keys to some
* destination.
* Interface representing a record generator action consisting of generating a record and sending
* it.
*/
@FunctionalInterface
public interface MessageGenerator {
interface GeneratorAction {
void generate(final String key);
public static <T> MessageGenerator from(
final RecordGenerator<T> generator,
final RecordSender<T> sender) {
public static <T> GeneratorAction from(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
return key -> sender.send(generator.generate(key));
}
......
......@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers,
final String topic,
......@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
}
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
}
......@@ -91,12 +91,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)
.forConstantValue(VALUE_DEFAULT)));
TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
TitanKafkaSenderFactory.forKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)));
}
/**
......@@ -170,13 +169,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl,
kafkaProperties)
.forConstantValue(value)))
TitanRecordGeneratorFactory.forConstantValue(value),
TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)))
.withThreads(threads);
}
......
......@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration;
*/
public class LoadGeneratorConfig {
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private BeforeAction beforeAction = BeforeAction.doNothing();
private int threads = 1;
public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
this.messageGenerator = messageGenerator;
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
this.messageGenerator = GeneratorAction.from(generator, sender);
}
public LoadGeneratorConfig(
final MessageGenerator messageGenerator,
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender,
final int threads) {
this.messageGenerator = messageGenerator;
this(generator, sender);
this.threads = threads;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public void setThreads(final int threads) {
this.threads = threads;
}
......@@ -37,6 +31,13 @@ public class LoadGeneratorConfig {
this.beforeAction = beforeAction;
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
}
......@@ -8,25 +8,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be
* A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be
* started and stopped.
*/
public class LoadGeneratorExecution {
class LoadGeneratorExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
private final Random random = new Random();
private final WorkloadDefinition workloadDefinition;
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private final ScheduledExecutorService executor;
/**
* Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a
* {@link MessageGenerator}. Load is generated by the given number of threads.
* {@link GeneratorAction}. Load is generated by the given number of threads.
*/
public LoadGeneratorExecution(
final WorkloadDefinition workloadDefinition,
final MessageGenerator messageGenerator,
final GeneratorAction messageGenerator,
final int threads) {
this.workloadDefinition = workloadDefinition;
this.messageGenerator = messageGenerator;
......
......@@ -4,46 +4,33 @@ import java.util.Properties;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s
* and sends them via Kafka.
* A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s.
*/
public final class TitanMessageGeneratorFactory {
public final class TitanKafkaSenderFactory {
private final RecordSender<ActivePowerRecord> recordSender;
private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
this.recordSender = recordSender;
}
private TitanKafkaSenderFactory() {}
/**
* Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/
public MessageGenerator forConstantValue(final double value) {
return MessageGenerator.from(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
this.recordSender);
}
/**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
*/
public static TitanMessageGeneratorFactory withKafkaConfig(
public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl) {
return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
}
/**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
* Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/
public static TitanMessageGeneratorFactory withKafkaConfig(
public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers,
final String topic,
final String schemaRegistryUrl,
final Properties properties) {
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender
return KafkaRecordSender
.<ActivePowerRecord>builder(
bootstrapServers,
topic,
......@@ -51,7 +38,5 @@ public final class TitanMessageGeneratorFactory {
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
return new TitanMessageGeneratorFactory(kafkaRecordSender);
}
}
package theodolite.commons.workloadgeneration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s.
*/
public final class TitanRecordGeneratorFactory {
private TitanRecordGeneratorFactory() {}
/**
* Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
*/
public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) {
return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment