Skip to content
Snippets Groups Projects
Commit 9b9fb7ee authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'split-record-generator-and-sender-in-load-generator' into 'master'

Split record generator and sender in load generator

Closes #280

See merge request !203
parents b9e21dea fd366614
No related branches found
No related tags found
1 merge request!203Split record generator and sender in load generator
Pipeline #5392 passed
Showing
with 139 additions and 60 deletions
package theodolite.commons.workloadgeneration; package theodolite.commons.workloadgeneration;
/** /**
* Interface representing a message generator, which sends messages for given keys to some * Interface representing a record generator action consisting of generating a record and sending
* destination. * it.
*/ */
@FunctionalInterface @FunctionalInterface
public interface MessageGenerator { interface GeneratorAction {
void generate(final String key); void generate(final String key);
public static <T> MessageGenerator from( public static <T> GeneratorAction from(
final RecordGenerator<T> generator, final RecordGenerator<? extends T> generator,
final RecordSender<T> sender) { final RecordSender<? super T> sender) {
return key -> sender.send(generator.generate(key)); return key -> sender.send(generator.generate(key));
} }
......
...@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
avroSerdeFactory.<T>forKeys().serializer()); avroSerdeFactory.<T>forKeys().serializer());
} }
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
public static <T extends SpecificRecord> Builder<T> builder( public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers, final String bootstrapServers,
final String topic, final String topic,
...@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
} }
} }
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
} }
...@@ -91,12 +91,11 @@ public final class LoadGenerator { ...@@ -91,12 +91,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT))) Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig( .setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
.withKafkaConfig( TitanKafkaSenderFactory.forKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT, KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT, KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT) SCHEMA_REGISTRY_URL_DEFAULT)));
.forConstantValue(VALUE_DEFAULT)));
} }
/** /**
...@@ -170,13 +169,11 @@ public final class LoadGenerator { ...@@ -170,13 +169,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs))) Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig( .setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory TitanRecordGeneratorFactory.forConstantValue(value),
.withKafkaConfig( TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers, kafkaBootstrapServers,
kafkaInputTopic, kafkaInputTopic,
schemaRegistryUrl, schemaRegistryUrl)))
kafkaProperties)
.forConstantValue(value)))
.withThreads(threads); .withThreads(threads);
} }
......
...@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration; ...@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration;
*/ */
public class LoadGeneratorConfig { public class LoadGeneratorConfig {
private final MessageGenerator messageGenerator; private final GeneratorAction messageGenerator;
private BeforeAction beforeAction = BeforeAction.doNothing(); private BeforeAction beforeAction = BeforeAction.doNothing();
private int threads = 1; private int threads = 1;
public LoadGeneratorConfig(final MessageGenerator messageGenerator) { public <T> LoadGeneratorConfig(
this.messageGenerator = messageGenerator; final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
this.messageGenerator = GeneratorAction.from(generator, sender);
} }
public LoadGeneratorConfig( public <T> LoadGeneratorConfig(
final MessageGenerator messageGenerator, final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender,
final int threads) { final int threads) {
this.messageGenerator = messageGenerator; this(generator, sender);
this.threads = threads; this.threads = threads;
} }
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public void setThreads(final int threads) { public void setThreads(final int threads) {
this.threads = threads; this.threads = threads;
} }
...@@ -37,6 +31,13 @@ public class LoadGeneratorConfig { ...@@ -37,6 +31,13 @@ public class LoadGeneratorConfig {
this.beforeAction = beforeAction; this.beforeAction = beforeAction;
} }
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
} }
...@@ -8,25 +8,25 @@ import org.slf4j.Logger; ...@@ -8,25 +8,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be * A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be
* started and stopped. * started and stopped.
*/ */
public class LoadGeneratorExecution { class LoadGeneratorExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
private final Random random = new Random(); private final Random random = new Random();
private final WorkloadDefinition workloadDefinition; private final WorkloadDefinition workloadDefinition;
private final MessageGenerator messageGenerator; private final GeneratorAction messageGenerator;
private final ScheduledExecutorService executor; private final ScheduledExecutorService executor;
/** /**
* Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a * Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a
* {@link MessageGenerator}. Load is generated by the given number of threads. * {@link GeneratorAction}. Load is generated by the given number of threads.
*/ */
public LoadGeneratorExecution( public LoadGeneratorExecution(
final WorkloadDefinition workloadDefinition, final WorkloadDefinition workloadDefinition,
final MessageGenerator messageGenerator, final GeneratorAction messageGenerator,
final int threads) { final int threads) {
this.workloadDefinition = workloadDefinition; this.workloadDefinition = workloadDefinition;
this.messageGenerator = messageGenerator; this.messageGenerator = messageGenerator;
......
...@@ -4,46 +4,33 @@ import java.util.Properties; ...@@ -4,46 +4,33 @@ import java.util.Properties;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s.
* and sends them via Kafka.
*/ */
public final class TitanMessageGeneratorFactory { public final class TitanKafkaSenderFactory {
private final RecordSender<ActivePowerRecord> recordSender; private TitanKafkaSenderFactory() {}
private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
this.recordSender = recordSender;
}
/** /**
* Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* constant value. * configuration.
*/ */
public MessageGenerator forConstantValue(final double value) { public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
return MessageGenerator.from(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
this.recordSender);
}
/**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
*/
public static TitanMessageGeneratorFactory withKafkaConfig(
final String bootstrapServers, final String bootstrapServers,
final String topic, final String topic,
final String schemaRegistryUrl) { final String schemaRegistryUrl) {
return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
} }
/** /**
* Create a new TitanMessageGeneratorFactory for the given Kafka configuration. * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
* configuration.
*/ */
public static TitanMessageGeneratorFactory withKafkaConfig( public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
final String bootstrapServers, final String bootstrapServers,
final String topic, final String topic,
final String schemaRegistryUrl, final String schemaRegistryUrl,
final Properties properties) { final Properties properties) {
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender return KafkaRecordSender
.<ActivePowerRecord>builder( .<ActivePowerRecord>builder(
bootstrapServers, bootstrapServers,
topic, topic,
...@@ -51,7 +38,5 @@ public final class TitanMessageGeneratorFactory { ...@@ -51,7 +38,5 @@ public final class TitanMessageGeneratorFactory {
.keyAccessor(r -> r.getIdentifier()) .keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp()) .timestampAccessor(r -> r.getTimestamp())
.build(); .build();
return new TitanMessageGeneratorFactory(kafkaRecordSender);
} }
} }
package theodolite.commons.workloadgeneration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s.
*/
public final class TitanRecordGeneratorFactory {
private TitanRecordGeneratorFactory() {}
/**
* Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
*/
public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) {
return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment