Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Showing
with 912 additions and 255 deletions
package theodolite.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
package theodolite.uc3.workloadgenerator;
import java.io.IOException;
import java.util.List;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
public class LoadGenerator {
/**
* The {@code LoadGenerator} creates a load in Kafka.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final int WL_MAX_RECORDS = 150_000;
// constants
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC3.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int instanceId = getInstanceId();
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
final int idStart = instanceId * WL_MAX_RECORDS;
final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
final List<String> sensors = IntStream.range(idStart, idEnd)
.mapToObj(i -> "s_" + i)
.collect(Collectors.toList());
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
new KafkaRecordSender.Builder<ActivePowerRecord>(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
LOGGER.info("Start setting up sensors.");
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
LOGGER.info("Finished setting up sensors.");
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
// start
workloadGenerator.start();
}
private static int getInstanceId() {
final String podName = System.getenv("POD_NAME");
if (podName == null) {
return 0;
} else {
return Pattern.compile("-")
.splitAsStream(podName)
.reduce((p, x) -> x)
.map(Integer::parseInt)
.orElse(0);
}
}
}
......@@ -4,8 +4,9 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
......@@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations;
*/
public class HistoryService {
private final Configuration config = Configurations.create();
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
......@@ -45,6 +46,7 @@ public class HistoryService {
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
package theodolite.uc4.streamprocessing;
import java.util.Objects;
/**
* Composed key of an hour of the day and a sensor id.
*/
......@@ -26,4 +28,22 @@ public class HourOfDayKey {
return this.sensorId + ";" + this.hourOfDay;
}
@Override
public int hashCode() {
return Objects.hash(this.hourOfDay, this.sensorId);
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof HourOfDayKey) {
final HourOfDayKey other = (HourOfDayKey) obj;
return Objects.equals(this.hourOfDay, other.hourOfDay)
&& Objects.equals(this.sensorId, other.sensorId);
}
return false;
}
}
......@@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import theodolite.uc4.streamprocessing.util.StatsFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecordFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Builds Kafka Stream Topology for the History microservice.
......@@ -32,6 +32,7 @@ public class TopologyBuilder {
private final String inputTopic;
private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration aggregtionDuration;
private final Duration aggregationAdvance;
......@@ -41,9 +42,11 @@ public class TopologyBuilder {
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic, final String outputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration aggregtionDuration, final Duration aggregationAdvance) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.aggregtionDuration = aggregtionDuration;
this.aggregationAdvance = aggregationAdvance;
}
......@@ -58,14 +61,14 @@ public class TopologyBuilder {
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime);
})
.groupByKey(
Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance))
.aggregate(
() -> Stats.of(),
......
......@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Builder for the Kafka Streams configuration.
......@@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl),
this.aggregtionDuration,
this.aggregationAdvance);
......
package theodolite.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
package theodolite.uc4.workloadgenerator;
import java.io.IOException;
import java.util.List;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
public class LoadGenerator {
/**
* The {@code LoadGenerator} creates a load in Kafka.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// constants
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc4
LOGGER.info("Start workload generator for use case UC4.");
final int numSensor =
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final List<String> sensors =
IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList());
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender.Builder<ActivePowerRecord>(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
}
}
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
dependencies {
implementation 'org.apache.curator:curator-recipes:4.3.0'
}
\ No newline at end of file
package theodolite.kafkasender;
package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
......@@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> {
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
private KafkaRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.timestampAccessor = builder.timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Builder class to build a new {@link KafkaRecordSender}.
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T extends SpecificRecord> {
private final String bootstrapServers;
private final String topic;
private final String schemaRegistryUrl;
private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
public Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl;
}
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
this.keyAccessor = keyAccessor;
return this;
}
public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) {
this.timestampAccessor = timestampAccessor;
return this;
}
public Builder<T> defaultProperties(final Properties defaultProperties) {
this.defaultProperties = defaultProperties;
return this;
}
public KafkaRecordSender<T> build() {
return new KafkaRecordSender<>(this);
}
}
/**
......@@ -81,4 +115,9 @@ public class KafkaRecordSender<T extends IMonitoringRecord> {
this.producer.close();
}
@Override
public void transport(final T message) {
this.write(message);
}
}
package theodolite.commons.workloadgeneration.communication.zookeeper;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* The central class responsible for distributing the workload through all workload generators.
*/
public class WorkloadDistributor {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class);
private static final String NAMESPACE = "workload-generation";
private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
// Curator retry strategy
private static final int BASE_SLEEP_TIME_MS = 2000;
private static final int MAX_RETRIES = 5;
// Wait time
private static final int MAX_WAIT_TIME = 20_000;
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final int instances;
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final CuratorFramework client;
private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false
/**
* Create a new workload distributor.
*
* @param keySpace the keyspace for the workload generation.
* @param beforeAction the before action for the workload generation.
* @param workerAction the action to perform by the workers.
*/
public WorkloadDistributor(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Integer> workerAction) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
this.workerAction = workerAction;
this.client = CuratorFrameworkFactory.builder()
.namespace(NAMESPACE)
.connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
.build();
this.client.start();
try {
this.client.blockUntilConnected();
} catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException(e);
}
this.counter =
new DistributedAtomicInteger(this.client, COUNTER_PATH,
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
}
/**
* Start the workload distribution.
*/
public void start() {
try {
AtomicValue<Integer> result = this.counter.increment();
while (!result.succeeded()) {
result = this.counter.increment();
}
final int workerId = result.preValue();
final CuratorWatcher watcher = this.buildWatcher(workerId);
final Stat nodeExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
if (nodeExists == null) {
this.client.create().forPath(WORKLOAD_PATH);
}
if (workerId == 0) {
LOGGER.info("This instance is master with id {}", workerId);
this.beforeAction.run();
// register worker action, as master acts also as worker
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
LOGGER.info("Number of Workers: {}", this.instances);
final WorkloadDefinition definition =
new WorkloadDefinition(this.keySpace, this.instances);
this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH,
definition.toString().getBytes(StandardCharsets.UTF_8));
} else {
LOGGER.info("This instance is worker with id {}", workerId);
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
final Stat definitionExists =
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (definitionExists != null) {
this.startWorkloadGeneration(workerId);
}
}
Thread.sleep(MAX_WAIT_TIME);
if (!this.workloadGenerationStarted) {
LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
}
} catch (final Exception e) { // NOPMD need to catch exception because of external framework
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error when starting the distribution of the workload.", e);
}
}
/**
* Start the workload generation. This methods body does only get executed once.
*
* @param workerId the ID of this worker
* @throws Exception when an error occurs
*/
// NOPMD because exception thrown from used framework
private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD
if (!this.workloadGenerationStarted) {
this.workloadGenerationStarted = true;
final byte[] bytes =
this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
this.workerAction.accept(definition, workerId);
}
}
/**
* Build a curator watcher which performs the worker action.
*
* @param worker the worker to create the watcher for.
* @return the curator watcher.
*/
private CuratorWatcher buildWatcher(final int workerId) {
return new CuratorWatcher() {
@Override
public void process(final WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
try {
WorkloadDistributor.this.startWorkloadGeneration(workerId);
} catch (final Exception e) { // NOPMD external framework throws exception
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error starting workload generation.", e);
}
}
}
};
}
/**
* Stop the workload distributor.
*/
public void stop() {
this.client.close();
}
}
package theodolite.commons.workloadgeneration.dimensions;
import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator;
/**
* Wrapper class for the definition of the Keys that should be used by the
* {@link AbstractWorkloadGenerator}.
*/
public class KeySpace {
private final String prefix;
private final int min;
private final int max;
/**
* Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of
* each key will be determined by a number of the interval ({@code min}, {@code max}-1).
*
* @param prefix the prefix to use for all keys
* @param min the lower bound (inclusive) to start counting from
* @param max the upper bound (exclusive) to count to
*/
public KeySpace(final String prefix, final int min, final int max) {
if (prefix == null || prefix.contains(";")) {
throw new IllegalArgumentException(
"The prefix must not be null and must not contain the ';' character.");
}
this.prefix = prefix;
this.min = min;
this.max = max;
}
public KeySpace(final String prefix, final int numberOfKeys) {
this(prefix, 0, numberOfKeys - 1);
}
public KeySpace(final int numberOfKeys) {
this("sensor_", 0, numberOfKeys - 1);
}
public String getPrefix() {
return this.prefix;
}
public int getMin() {
return this.min;
}
public int getMax() {
return this.max;
}
}
package theodolite.commons.workloadgeneration.functions;
/**
* Describes the before action which is executed before every sub experiment.
*/
@FunctionalInterface
public interface BeforeAction {
public void run();
}
package theodolite.commons.workloadgeneration.functions;
/**
* This interface describes a function that takes meta information from a string (e.g. an ID) and
* produces an object of type T.
*
* @param <T> the type of the objects that will be generated by the function.
*/
@FunctionalInterface
public interface MessageGenerator<T> {
T generateMessage(final String key);
}
package theodolite.commons.workloadgeneration.functions;
/**
* This interface describes a function that consumes a message {@code T}. This function is dedicated
* to be used to transport individual messages to the messaging system.
*
* @param <T> the type of records to send as messages.
*/
@FunctionalInterface
public interface Transport<T> {
void transport(final T message);
}
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.functions.Transport;
import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Base for workload generators.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public abstract class AbstractWorkloadGenerator<T>
implements WorkloadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
private final int instances; // NOPMD keep instance variable instead of local variable
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable
private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable
private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
private final MessageGenerator<T> generatorFunction;
private final Transport<T> transport;
private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local
private final ScheduledExecutorService executor;
/**
* Create a new workload generator.
*
* @param instances the number of workload-generator instances.
* @param zooKeeper the zookeeper connection.
* @param keySpace the keyspace.
* @param threads the number of threads that is used to generate the load.
* @param period the period, how often a new record is emitted.
* @param duration the maximum runtime.
* @param beforeAction the action to perform before the workload generation starts.
* @param generatorFunction the function that is used to generate the individual records.
* @param transport the function that is used to send generated messages to the messaging system.
*/
public AbstractWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
final Transport<T> transport) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
this.generatorFunction = generatorFunction;
this.workloadSelector = (workloadDefinition, workerId) -> {
final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>();
for (int i =
workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition
.getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) {
final String id = workloadDefinition.getKeySpace().getPrefix() + i;
workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction));
}
return workloadEntities;
};
this.transport = transport;
this.executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final int periodMs = (int) period.toMillis();
LOGGER.info("Period: {}", periodMs);
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Generating records for {} keys.", entities.size());
LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> {
final long initialDelay = random.nextInt(periodMs);
final Runnable task = () -> this.transport.transport(entity.generateMessage());
this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
});
try {
this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS);
LOGGER.info("Terminating now...");
this.stop();
} catch (final InterruptedException e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when terminating the workload generation.", e);
}
};
this.workloadDistributor = new WorkloadDistributor(
this.instances,
this.zooKeeper,
this.keySpace,
this.beforeAction,
workerAction);
}
/**
* Start the workload generation. The generation terminates automatically after the specified
* {@code duration}.
*/
@Override
public void start() {
this.workloadDistributor.start();
}
@Override
public void stop() {
this.workloadDistributor.stop();
}
}
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/**
* Workload generator for generating load for the kafka messaging system.
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class KafkaWorkloadGenerator<T extends SpecificRecord>
extends AbstractWorkloadGenerator<T> {
private final KafkaRecordSender<T> recordSender;
/**
* Create a new workload generator.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @param keySpace the key space to generate the workload for.
* @param threads tha amount of threads to use per instance.
* @param period the period how often a message is generated for each key specified in the
* {@code keySpace}
* @param duration the duration how long the workload generator will emit messages.
* @param beforeAction the action which will be performed before the workload generator starts
* generating messages. If {@code null}, no before action will be performed.
* @param generatorFunction the generator function. This function is executed, each time a message
* is generated.
* @param recordSender the record sender which is used to send the generated messages to kafka.
*/
public KafkaWorkloadGenerator(
final int instances,
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
final KafkaRecordSender<T> recordSender) {
super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction,
generatorFunction,
recordSender);
this.recordSender = recordSender;
}
@Override
public void stop() {
this.recordSender.terminate();
super.stop();
}
}