Skip to content
Snippets Groups Projects
Commit ff796e51 authored by Sören Henning's avatar Sören Henning
Browse files

Add support for Kubernetes auto discovery

parent d3759416
No related branches found
No related tags found
No related merge requests found
dependencies { dependencies {
implementation 'com.google.guava:guava:30.1-jre' implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.hazelcast:hazelcast:4.1.1' implementation 'com.hazelcast:hazelcast:4.1.1'
implementation 'com.hazelcast:hazelcast-kubernetes:4.1.1'
} }
\ No newline at end of file
...@@ -3,43 +3,46 @@ package theodolite.commons.workloadgeneration; ...@@ -3,43 +3,46 @@ package theodolite.commons.workloadgeneration;
/** /**
* Configuration of a load generator cluster. * Configuration of a load generator cluster.
*/ */
public class ClusterConfig { public final class ClusterConfig {
public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; /*
* public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; public static final int
* PORT_DEFAULT = 5701; public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true; public
* static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
*/
public static final int PORT_DEFAULT = 5701; public static final int PORT_DEFAULT = 5701;
public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true;
public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation"; public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
private final String bootstrapServer; private final String bootstrapServer;
private final int port; private final String kubernetesDnsName;
private final boolean portAutoIncrement; private int port = PORT_DEFAULT;
private final String clusterNamePrefix; private boolean portAutoIncrement = true;
private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT;
/**
* Create a new {@link ClusterConfig} with its default values.
*/
public ClusterConfig() {
this(
BOOTSTRAP_SERVER_DEFAULT,
PORT_DEFAULT,
PORT_AUTO_INCREMENT_DEFAULT,
CLUSTER_NAME_PREFIX_DEFAULT);
}
/** /**
* Create a new {@link ClusterConfig} with the given parameter values. * Create a new {@link ClusterConfig} with the given parameter values.
*/ */
public ClusterConfig(final String bootstrapServer, final int port, private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) {
final boolean portAutoIncrement, final String clusterNamePrefix) {
this.bootstrapServer = bootstrapServer; this.bootstrapServer = bootstrapServer;
this.port = port; this.kubernetesDnsName = kubernetesDnsName;
this.portAutoIncrement = portAutoIncrement; }
this.clusterNamePrefix = clusterNamePrefix;
public boolean hasBootstrapServer() {
return this.bootstrapServer != null;
} }
public String getBootstrapServer() { public String getBootstrapServer() {
return this.bootstrapServer; return this.bootstrapServer;
} }
public boolean hasKubernetesDnsName() {
return this.kubernetesDnsName != null;
}
public String getKubernetesDnsName() {
return this.kubernetesDnsName;
}
public int getPort() { public int getPort() {
return this.port; return this.port;
} }
...@@ -48,9 +51,31 @@ public class ClusterConfig { ...@@ -48,9 +51,31 @@ public class ClusterConfig {
return this.portAutoIncrement; return this.portAutoIncrement;
} }
public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD
this.portAutoIncrement = portAutoIncrement;
return this;
}
public ClusterConfig setPort(final int port) { // NOPMD
this.port = port;
return this;
}
public String getClusterNamePrefix() { public String getClusterNamePrefix() {
return this.clusterNamePrefix; return this.clusterNamePrefix;
} }
public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD
this.clusterNamePrefix = clusterNamePrefix;
return this;
}
public static ClusterConfig fromBootstrapServer(final String bootstrapServer) {
return new ClusterConfig(bootstrapServer, null);
}
public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) {
return new ClusterConfig(null, kubernetesDnsName);
}
} }
...@@ -7,6 +7,8 @@ public final class ConfigurationKeys { ...@@ -7,6 +7,8 @@ public final class ConfigurationKeys {
public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME";
public static final String PORT = "PORT"; public static final String PORT = "PORT";
public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
......
...@@ -15,6 +15,7 @@ import java.util.concurrent.CompletableFuture; ...@@ -15,6 +15,7 @@ import java.util.concurrent.CompletableFuture;
*/ */
public class HazelcastRunner { public class HazelcastRunner {
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private final HazelcastInstance hzInstance; private final HazelcastInstance hzInstance;
private volatile HazelcastRunnerStateInstance runnerState; private volatile HazelcastRunnerStateInstance runnerState;
private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
...@@ -92,8 +93,13 @@ public class HazelcastRunner { ...@@ -92,8 +93,13 @@ public class HazelcastRunner {
.setPortAutoIncrement(cluster.isPortAutoIncrement()) .setPortAutoIncrement(cluster.isPortAutoIncrement())
.getJoin(); .getJoin();
joinConfig.getMulticastConfig().setEnabled(false); joinConfig.getMulticastConfig().setEnabled(false);
// joinConfig.getKubernetesConfig().setEnabled(true); if (cluster.hasBootstrapServer()) {
joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer()); joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer());
} else if (cluster.hasKubernetesDnsName()) {
joinConfig.getKubernetesConfig()
.setEnabled(true)
.setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, cluster.getKubernetesDnsName());
}
return Hazelcast.newHazelcastInstance(config); return Hazelcast.newHazelcastInstance(config);
} }
......
...@@ -4,12 +4,17 @@ import java.time.Duration; ...@@ -4,12 +4,17 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A Theodolite load generator. * A Theodolite load generator.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
private static final String SENSOR_PREFIX_DEFAULT = "s_"; private static final String SENSOR_PREFIX_DEFAULT = "s_";
private static final int NUMBER_OF_KEYS_DEFAULT = 10; private static final int NUMBER_OF_KEYS_DEFAULT = 10;
private static final int PERIOD_MS_DEFAULT = 1000; private static final int PERIOD_MS_DEFAULT = 1000;
...@@ -17,7 +22,7 @@ public final class LoadGenerator { ...@@ -17,7 +22,7 @@ public final class LoadGenerator {
private static final int THREADS_DEFAULT = 4; private static final int THREADS_DEFAULT = 4;
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:19092"; // NOPMD
private ClusterConfig clusterConfig; private ClusterConfig clusterConfig;
private WorkloadDefinition loadDefinition; private WorkloadDefinition loadDefinition;
...@@ -81,7 +86,7 @@ public final class LoadGenerator { ...@@ -81,7 +86,7 @@ public final class LoadGenerator {
*/ */
public static LoadGenerator fromDefaults() { public static LoadGenerator fromDefaults() {
return new LoadGenerator() return new LoadGenerator()
.setClusterConfig(new ClusterConfig()) .setClusterConfig(ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT))
.setLoadDefinition(new WorkloadDefinition( .setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT))) Duration.ofMillis(PERIOD_MS_DEFAULT)))
...@@ -98,18 +103,36 @@ public final class LoadGenerator { ...@@ -98,18 +103,36 @@ public final class LoadGenerator {
* Create a basic {@link LoadGenerator} from environment variables. * Create a basic {@link LoadGenerator} from environment variables.
*/ */
public static LoadGenerator fromEnvironment() { public static LoadGenerator fromEnvironment() {
final String bootstrapServer = Objects.requireNonNullElse( final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER), final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig.BOOTSTRAP_SERVER_DEFAULT);
final int port = Integer.parseInt(Objects.requireNonNullElse( ClusterConfig clusterConfig;
System.getenv(ConfigurationKeys.PORT), if (bootstrapServer != null) { // NOPMD
Integer.toString(ClusterConfig.PORT_DEFAULT))); clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
final boolean portAutoIncrement = Boolean.parseBoolean(Objects.requireNonNullElse( LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT), } else if (kubernetesDnsName != null) { // NOPMD
Boolean.toString(ClusterConfig.PORT_AUTO_INCREMENT_DEFAULT))); clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
final String clusterNamePrefix = Objects.requireNonNullElse( LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName);
System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX), } else {
ClusterConfig.CLUSTER_NAME_PREFIX_DEFAULT); throw new IllegalArgumentException(
"Neither a bootstrap server nor a Kubernetes DNS name was provided.");
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(portAutoIncrement);
}
final int numSensors = Integer.parseInt(Objects.requireNonNullElse( final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.NUM_SENSORS), System.getenv(ConfigurationKeys.NUM_SENSORS),
Integer.toString(NUMBER_OF_KEYS_DEFAULT))); Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
...@@ -140,11 +163,7 @@ public final class LoadGenerator { ...@@ -140,11 +163,7 @@ public final class LoadGenerator {
(k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
return new LoadGenerator() return new LoadGenerator()
.setClusterConfig(new ClusterConfig( .setClusterConfig(clusterConfig)
bootstrapServer,
port,
portAutoIncrement,
clusterNamePrefix))
.setLoadDefinition(new WorkloadDefinition( .setLoadDefinition(new WorkloadDefinition(
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs))) Duration.ofMillis(periodMs)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment