Skip to content
Snippets Groups Projects

Zookeeper free workload generator

Merged Sören Henning requested to merge zookeeper-free-workload-generator into master
Files
3
@@ -5,12 +5,16 @@ import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
/**
* A Theodolite load generator.
*/
public final class LoadGenerator {
private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000;
private static final int VALUE_DEFAULT = 10;
private static final int THREADS_DEFAULT = 4;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
@@ -49,6 +53,9 @@ public final class LoadGenerator {
return this;
}
/**
* Run the constructed load generator until cancellation.
*/
public void run() {
Objects.requireNonNull(this.clusterConfig, "No cluster config set.");
Objects.requireNonNull(this.generatorConfig, "No generator config set.");
@@ -64,6 +71,9 @@ public final class LoadGenerator {
runner.runBlocking();
}
/**
* Create a basic {@link LoadGenerator} from its default values.
*/
public static LoadGenerator fromDefaults() {
return new LoadGenerator()
.setClusterConfig(new ClusterConfig())
@@ -79,6 +89,9 @@ public final class LoadGenerator {
.forConstantValue(VALUE_DEFAULT)));
}
/**
* Create a basic {@link LoadGenerator} from environment variables.
*/
public static LoadGenerator fromEnvironment() {
final String bootstrapServer = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER),
@@ -101,6 +114,9 @@ public final class LoadGenerator {
final double value = Double.parseDouble(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.VALUE),
Integer.toString(VALUE_DEFAULT)));
final int threads = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.THREADS),
Integer.toString(THREADS_DEFAULT)));
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
@@ -134,12 +150,8 @@ public final class LoadGenerator {
kafkaInputTopic,
schemaRegistryUrl,
kafkaProperties)
.forConstantValue(value)));
}
public static void main(final String[] args) {
LoadGenerator.fromEnvironment()
.run();
.forConstantValue(value)))
.withThreads(threads);
}
}
Loading