diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 2e5fc627e4d9467aed5ec5e279c56070f7d0ccfe..b4f2d9ab4aedd6c030e3d99a6f0848432e7361dc 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -14,6 +14,7 @@ public final class LoadGenerator { private static final int NUMBER_OF_KEYS_DEFAULT = 10; private static final int PERIOD_MS_DEFAULT = 1000; private static final int VALUE_DEFAULT = 10; + private static final int THREADS_DEFAULT = 4; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD @@ -113,6 +114,9 @@ public final class LoadGenerator { final double value = Double.parseDouble(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.VALUE), Integer.toString(VALUE_DEFAULT))); + final int threads = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.THREADS), + Integer.toString(THREADS_DEFAULT))); final String kafkaBootstrapServers = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), KAFKA_BOOTSTRAP_SERVERS_DEFAULT); @@ -146,7 +150,8 @@ public final class LoadGenerator { kafkaInputTopic, schemaRegistryUrl, kafkaProperties) - .forConstantValue(value))); + .forConstantValue(value)) + .setThreads(threads)); } }