Skip to content
Snippets Groups Projects
Commit 6fca0275 authored by Sören Henning's avatar Sören Henning
Browse files

Add support for Kubernetes auto discovery

parent 2881e222
No related branches found
No related tags found
2 merge requests!86Zookeeper free workload generator,!84Gitlab CI for Theodolite-Kotlin-Quarkus
Pipeline #2010 passed with warnings
dependencies {
implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.hazelcast:hazelcast:4.1.1'
implementation 'com.hazelcast:hazelcast-kubernetes:4.1.1'
}
\ No newline at end of file
......@@ -3,43 +3,46 @@ package theodolite.commons.workloadgeneration;
/**
* Configuration of a load generator cluster.
*/
public class ClusterConfig {
public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
public final class ClusterConfig {
/*
* public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; public static final int
* PORT_DEFAULT = 5701; public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true; public
* static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
*/
public static final int PORT_DEFAULT = 5701;
public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true;
public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
private final String bootstrapServer;
private final int port;
private final boolean portAutoIncrement;
private final String clusterNamePrefix;
/**
* Create a new {@link ClusterConfig} with its default values.
*/
public ClusterConfig() {
this(
BOOTSTRAP_SERVER_DEFAULT,
PORT_DEFAULT,
PORT_AUTO_INCREMENT_DEFAULT,
CLUSTER_NAME_PREFIX_DEFAULT);
}
private final String kubernetesDnsName;
private int port = PORT_DEFAULT;
private boolean portAutoIncrement = true;
private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT;
/**
* Create a new {@link ClusterConfig} with the given parameter values.
*/
public ClusterConfig(final String bootstrapServer, final int port,
final boolean portAutoIncrement, final String clusterNamePrefix) {
private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) {
this.bootstrapServer = bootstrapServer;
this.port = port;
this.portAutoIncrement = portAutoIncrement;
this.clusterNamePrefix = clusterNamePrefix;
this.kubernetesDnsName = kubernetesDnsName;
}
public boolean hasBootstrapServer() {
return this.bootstrapServer != null;
}
public String getBootstrapServer() {
return this.bootstrapServer;
}
public boolean hasKubernetesDnsName() {
return this.kubernetesDnsName != null;
}
public String getKubernetesDnsName() {
return this.kubernetesDnsName;
}
public int getPort() {
return this.port;
}
......@@ -48,9 +51,31 @@ public class ClusterConfig {
return this.portAutoIncrement;
}
public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD
this.portAutoIncrement = portAutoIncrement;
return this;
}
public ClusterConfig setPort(final int port) { // NOPMD
this.port = port;
return this;
}
public String getClusterNamePrefix() {
return this.clusterNamePrefix;
}
public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD
this.clusterNamePrefix = clusterNamePrefix;
return this;
}
public static ClusterConfig fromBootstrapServer(final String bootstrapServer) {
return new ClusterConfig(bootstrapServer, null);
}
public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) {
return new ClusterConfig(null, kubernetesDnsName);
}
}
......@@ -7,6 +7,8 @@ public final class ConfigurationKeys {
public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME";
public static final String PORT = "PORT";
public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
......
......@@ -15,6 +15,7 @@ import java.util.concurrent.CompletableFuture;
*/
public class HazelcastRunner {
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private final HazelcastInstance hzInstance;
private volatile HazelcastRunnerStateInstance runnerState;
private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
......@@ -92,8 +93,13 @@ public class HazelcastRunner {
.setPortAutoIncrement(cluster.isPortAutoIncrement())
.getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
// joinConfig.getKubernetesConfig().setEnabled(true);
joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer());
if (cluster.hasBootstrapServer()) {
joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer());
} else if (cluster.hasKubernetesDnsName()) {
joinConfig.getKubernetesConfig()
.setEnabled(true)
.setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, cluster.getKubernetesDnsName());
}
return Hazelcast.newHazelcastInstance(config);
}
......
......@@ -4,12 +4,17 @@ import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Theodolite load generator.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000;
......@@ -17,7 +22,7 @@ public final class LoadGenerator {
private static final int THREADS_DEFAULT = 4;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:19092"; // NOPMD
private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition;
......@@ -81,7 +86,7 @@ public final class LoadGenerator {
*/
public static LoadGenerator fromDefaults() {
return new LoadGenerator()
.setClusterConfig(new ClusterConfig())
.setClusterConfig(ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT))
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
......@@ -98,18 +103,36 @@ public final class LoadGenerator {
* Create a basic {@link LoadGenerator} from environment variables.
*/
public static LoadGenerator fromEnvironment() {
final String bootstrapServer = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER),
ClusterConfig.BOOTSTRAP_SERVER_DEFAULT);
final int port = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PORT),
Integer.toString(ClusterConfig.PORT_DEFAULT)));
final boolean portAutoIncrement = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT),
Boolean.toString(ClusterConfig.PORT_AUTO_INCREMENT_DEFAULT)));
final String clusterNamePrefix = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX),
ClusterConfig.CLUSTER_NAME_PREFIX_DEFAULT);
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
} else {
throw new IllegalArgumentException(
"Neither a bootstrap server nor a Kubernetes DNS name was provided.");
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
......@@ -140,11 +163,7 @@ public final class LoadGenerator {
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
return new LoadGenerator()
.setClusterConfig(new ClusterConfig(
bootstrapServer,
port,
portAutoIncrement,
clusterNamePrefix))
.setClusterConfig(clusterConfig)
.setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment