diff --git a/benchmarks/workload-generator-commons/build.gradle b/benchmarks/workload-generator-commons/build.gradle index 8236a89c15e05b4cf5a5237b68b838eaf3eb0126..5a0f2da2501df6ba949767aa914fbf631cc36483 100644 --- a/benchmarks/workload-generator-commons/build.gradle +++ b/benchmarks/workload-generator-commons/build.gradle @@ -1,4 +1,5 @@ dependencies { implementation 'com.google.guava:guava:30.1-jre' implementation 'com.hazelcast:hazelcast:4.1.1' + implementation 'com.hazelcast:hazelcast-kubernetes:4.1.1' } \ No newline at end of file diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java index e629a13bf839ead1ced2914c1c30c12344ce46da..18f7623f7a259c2789553f61b8c05be481adedff 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java @@ -3,43 +3,46 @@ package theodolite.commons.workloadgeneration; /** * Configuration of a load generator cluster. */ -public class ClusterConfig { - public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; +public final class ClusterConfig { + /* + * public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; public static final int + * PORT_DEFAULT = 5701; public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true; public + * static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation"; + */ + public static final int PORT_DEFAULT = 5701; - public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true; public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation"; private final String bootstrapServer; - private final int port; - private final boolean portAutoIncrement; - private final String clusterNamePrefix; - - /** - * Create a new {@link ClusterConfig} with its default values. - */ - public ClusterConfig() { - this( - BOOTSTRAP_SERVER_DEFAULT, - PORT_DEFAULT, - PORT_AUTO_INCREMENT_DEFAULT, - CLUSTER_NAME_PREFIX_DEFAULT); - } + private final String kubernetesDnsName; + private int port = PORT_DEFAULT; + private boolean portAutoIncrement = true; + private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT; /** * Create a new {@link ClusterConfig} with the given parameter values. */ - public ClusterConfig(final String bootstrapServer, final int port, - final boolean portAutoIncrement, final String clusterNamePrefix) { + private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) { this.bootstrapServer = bootstrapServer; - this.port = port; - this.portAutoIncrement = portAutoIncrement; - this.clusterNamePrefix = clusterNamePrefix; + this.kubernetesDnsName = kubernetesDnsName; + } + + public boolean hasBootstrapServer() { + return this.bootstrapServer != null; } public String getBootstrapServer() { return this.bootstrapServer; } + public boolean hasKubernetesDnsName() { + return this.kubernetesDnsName != null; + } + + public String getKubernetesDnsName() { + return this.kubernetesDnsName; + } + public int getPort() { return this.port; } @@ -48,9 +51,31 @@ public class ClusterConfig { return this.portAutoIncrement; } + public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD + this.portAutoIncrement = portAutoIncrement; + return this; + } + + public ClusterConfig setPort(final int port) { // NOPMD + this.port = port; + return this; + } + public String getClusterNamePrefix() { return this.clusterNamePrefix; } + public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD + this.clusterNamePrefix = clusterNamePrefix; + return this; + } + + public static ClusterConfig fromBootstrapServer(final String bootstrapServer) { + return new ClusterConfig(bootstrapServer, null); + } + + public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) { + return new ClusterConfig(null, kubernetesDnsName); + } } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 81610170cb3ea87edbe7eed61133f50caf291aa3..45ac1d5bb9c21a1b6303de2f248d08b69c02fc28 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -7,6 +7,8 @@ public final class ConfigurationKeys { public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; + public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME"; + public static final String PORT = "PORT"; public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java index 72eccc66a862d136118e528961c4a13a07702ddc..c010492950c5caace9ff85baefee1af4e46d25bb 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletableFuture; */ public class HazelcastRunner { + private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; private final HazelcastInstance hzInstance; private volatile HazelcastRunnerStateInstance runnerState; private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); @@ -92,8 +93,13 @@ public class HazelcastRunner { .setPortAutoIncrement(cluster.isPortAutoIncrement()) .getJoin(); joinConfig.getMulticastConfig().setEnabled(false); - // joinConfig.getKubernetesConfig().setEnabled(true); - joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer()); + if (cluster.hasBootstrapServer()) { + joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer()); + } else if (cluster.hasKubernetesDnsName()) { + joinConfig.getKubernetesConfig() + .setEnabled(true) + .setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, cluster.getKubernetesDnsName()); + } return Hazelcast.newHazelcastInstance(config); } diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 685f4e5b65c73888e8eecab149cddc4bb9aec2b8..96037fc6d0091ba5ac4194815bf0f435771f2f1d 100644 --- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -4,12 +4,17 @@ import java.time.Duration; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Theodolite load generator. */ public final class LoadGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + + private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; private static final String SENSOR_PREFIX_DEFAULT = "s_"; private static final int NUMBER_OF_KEYS_DEFAULT = 10; private static final int PERIOD_MS_DEFAULT = 1000; @@ -17,7 +22,7 @@ public final class LoadGenerator { private static final int THREADS_DEFAULT = 4; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String KAFKA_TOPIC_DEFAULT = "input"; - private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:19092"; // NOPMD private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -81,7 +86,7 @@ public final class LoadGenerator { */ public static LoadGenerator fromDefaults() { return new LoadGenerator() - .setClusterConfig(new ClusterConfig()) + .setClusterConfig(ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT)) .setLoadDefinition(new WorkloadDefinition( new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) @@ -98,18 +103,36 @@ public final class LoadGenerator { * Create a basic {@link LoadGenerator} from environment variables. */ public static LoadGenerator fromEnvironment() { - final String bootstrapServer = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER), - ClusterConfig.BOOTSTRAP_SERVER_DEFAULT); - final int port = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.PORT), - Integer.toString(ClusterConfig.PORT_DEFAULT))); - final boolean portAutoIncrement = Boolean.parseBoolean(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT), - Boolean.toString(ClusterConfig.PORT_AUTO_INCREMENT_DEFAULT))); - final String clusterNamePrefix = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX), - ClusterConfig.CLUSTER_NAME_PREFIX_DEFAULT); + final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); + final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); + + ClusterConfig clusterConfig; + if (bootstrapServer != null) { // NOPMD + clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); + LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); + } else if (kubernetesDnsName != null) { // NOPMD + clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); + LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); + } else { + throw new IllegalArgumentException( + "Neither a bootstrap server nor a Kubernetes DNS name was provided."); + } + + final String port = System.getenv(ConfigurationKeys.PORT); + if (port != null) { + clusterConfig.setPort(Integer.parseInt(port)); + } + + final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); + if (portAutoIncrement != null) { + clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); + } + + final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); + if (clusterNamePrefix != null) { + clusterConfig.setClusterNamePrefix(portAutoIncrement); + } + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.NUM_SENSORS), Integer.toString(NUMBER_OF_KEYS_DEFAULT))); @@ -140,11 +163,7 @@ public final class LoadGenerator { (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); return new LoadGenerator() - .setClusterConfig(new ClusterConfig( - bootstrapServer, - port, - portAutoIncrement, - clusterNamePrefix)) + .setClusterConfig(clusterConfig) .setLoadDefinition(new WorkloadDefinition( new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs)))