diff --git a/execution/infrastructure/kafka/values.yaml b/execution/infrastructure/kafka/values.yaml index e65a5fc567d39c7389479d406fa9e6d7156b0f0a..d20b0491844f714c185bf966e2d5fdb54d0dc50d 100644 --- a/execution/infrastructure/kafka/values.yaml +++ b/execution/infrastructure/kafka/values.yaml @@ -3,7 +3,7 @@ ## ------------------------------------------------------ cp-zookeeper: enabled: true - servers: 3 + servers: 1 image: confluentinc/cp-zookeeper imageTag: 5.4.0 ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace. @@ -28,7 +28,7 @@ cp-zookeeper: ## ------------------------------------------------------ cp-kafka: enabled: true - brokers: 10 + brokers: 1 image: confluentinc/cp-enterprise-kafka imageTag: 5.4.0 ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace. @@ -48,7 +48,7 @@ cp-kafka: # cpu: 100m # memory: 128Mi configurationOverrides: - #"offsets.topic.replication.factor": "3" + offsets.topic.replication.factor: "1" "message.max.bytes": "134217728" # 128 MB "replica.fetch.max.bytes": "134217728" # 128 MB # "default.replication.factor": 3 diff --git a/execution/infrastructure/kafka/values_new.yaml b/execution/infrastructure/kafka/values_new.yaml new file mode 100644 index 0000000000000000000000000000000000000000..41b111f32851ecea0be67bafadb95594f3119cbd --- /dev/null +++ b/execution/infrastructure/kafka/values_new.yaml @@ -0,0 +1,15 @@ +cp-helm-charts: + ## ------------------------------------------------------ + ## Zookeeper + ## ------------------------------------------------------ + cp-zookeeper: + servers: 1 # default: 3 + + ## ------------------------------------------------------ + ## Kafka + ## ------------------------------------------------------ + cp-kafka: + brokers: 1 # deauflt: 10 + + configurationOverrides: + offsets.topic.replication.factor: "3" \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt index 2abfd8ce8b012a6f9738b4c384cc5cff33382509..7fdd3990ce313a5a77e37aa9215a7dbd1742ed24 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt @@ -5,39 +5,52 @@ import theodolite.k8s.UC1Benchmark import theodolite.strategies.restriction.LowerBoundRestriction import theodolite.strategies.searchstrategy.CompositeStrategy import theodolite.strategies.searchstrategy.LinearSearch -import theodolite.util.* +import theodolite.util.Config +import theodolite.util.LoadDimension +import theodolite.util.Resource +import theodolite.util.Results import java.time.Duration private val logger = KotlinLogging.logger {} class TheodoliteExecutor() { + val path = "/home/lorenz/git/spesb/theodolite-quarkus/src/main/resources/yaml" private fun loadConfig(): Config { val benchmark: UC1Benchmark = UC1Benchmark( - UC1Benchmark.UC1BenchmarkConfig( - zookeeperConnectionString = "my-confluent-cp-zookeeper:2181", - kafkaIPConnectionString = "my-confluent-cp-kafka:9092", + UC1Benchmark.UC1BenchmarkConfig( // use port forward 2181 -> 2181 + zookeeperConnectionString = "localhost:2181", //"my-confluent-cp-zookeeper:2181", //localhost:2181. + kafkaIPConnectionString = "localhost:9092",//"my-confluent-cp-kafka:","178.18.0." schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081", kafkaPartition = 40, - kafkaReplication = 3, + kafkaReplication = 1, kafkaTopics = listOf("input", "output"), // TODO("handle path in a more nice way (not absolut)") - ucDeploymentPath = "src/main/resources/yaml/aggregation-deployment.yaml", - ucServicePath = "src/main/resources/yaml/aggregation-service.yaml", - wgDeploymentPath = "src/main/resources/yaml/workloadGenerator.yaml", + ucDeploymentPath = path + "/aggregation-deployment.yaml", + ucServicePath = path + "/aggregation-service.yaml", + wgDeploymentPath = path + "/workloadGenerator.yaml", + configMapPath = path + "/jmx-configmap.yaml", ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest", - wgImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-workload-generator:latest" - )) + wgImageURL = "ghcr.io/cau-se/theodolite-uc1-workload-generator:theodolite-kotlin-latest" + ) + ) val results: Results = Results() - val executionDuration = Duration.ofSeconds(60*5) + + val executionDuration = Duration.ofSeconds(60 * 5) + val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration) val restrictionStrategy = LowerBoundRestriction(results) val searchStrategy = LinearSearch(executor, results) return Config( - loads = (0..6).map{ number -> LoadDimension(number) }, - resources = (0..6).map{ number -> Resource(number) }, - compositeStrategy = CompositeStrategy(executor, searchStrategy, restrictionStrategies = setOf(restrictionStrategy), results = results), + loads = (1..6).map { number -> LoadDimension(number) }, + resources = (1..6).map { number -> Resource(number) }, + compositeStrategy = CompositeStrategy( + executor, + searchStrategy, + restrictionStrategies = setOf(restrictionStrategy), + results = results + ), executionDuration = executionDuration ) } @@ -47,9 +60,8 @@ class TheodoliteExecutor() { val config = this.loadConfig() // execute benchmarks for each load - for(load in config.loads) { + for (load in config.loads) { config.compositeStrategy.findSuitableResource(load, config.resources) } - } -} \ No newline at end of file +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt index fe11085c6060e7bca099e6ab27bcd0480cc41168..7fbf604c47b568881b068a798d4cc32e2afefa21 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt @@ -11,7 +11,7 @@ class ConfigMapManager(client: NamespacedKubernetesClient) { } fun deploy(configMap: ConfigMap) { - client.configMaps().create(configMap) + client.configMaps().createOrReplace(configMap) } fun delete(configMap: ConfigMap) { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt index f885b3bf2a69bc19b6bb93f4d35ac624e53d93a1..9f6f3ab7a716111629a7cd1df78c5c715f09134e 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt @@ -75,7 +75,7 @@ class DeploymentManager(client: NamespacedKubernetesClient) { // TODO potential add exception handling fun deploy(deployment: Deployment) { - client.apps().deployments().create(deployment) + client.apps().deployments().createOrReplace(deployment) } // TODO potential add exception handling diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt index 16e5bbd646d99f4155bd1b232d148be463a8c37c..ed262d57ac952fb298a66ea98a5882ba37af79f7 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt @@ -18,7 +18,7 @@ class ServiceManager(client: NamespacedKubernetesClient) { } fun deploy(service: Service) { - client.services().create(service) + client.services().createOrReplace(service) } fun delete(service: Service) { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt index 128fefd2a27010f444dfaf01399d3dadadfa3e51..645c2acdd142bdb2d949419ec4e3ea00e7e87e4f 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt @@ -1,34 +1,45 @@ package theodolite.k8s +import io.fabric8.kubernetes.api.model.ConfigMap import io.fabric8.kubernetes.api.model.Service import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import mu.KotlinLogging import theodolite.util.Benchmark import theodolite.util.LoadDimension import theodolite.util.Resource +private val logger = KotlinLogging.logger {} + class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner val topicManager: TopicManager + // TODO("service monitor") val kubernetesClient: NamespacedKubernetesClient val yamlLoader: YamlLoader val deploymentManager: DeploymentManager val serviceManager: ServiceManager + val configMapManager: ConfigMapManager var ucDeployment: Deployment var ucService: Service + var wgDeployment: Deployment + var configMap: ConfigMap init { this.workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.config.zookeeperConnectionString) this.topicManager = TopicManager(this.config.kafkaIPConnectionString) - this.kubernetesClient = DefaultKubernetesClient() + this.kubernetesClient = DefaultKubernetesClient().inNamespace("default") this.yamlLoader = YamlLoader(this.kubernetesClient) this.deploymentManager = DeploymentManager(this.kubernetesClient) this.serviceManager = ServiceManager(this.kubernetesClient) + this.configMapManager = ConfigMapManager(this.kubernetesClient) ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath) ucService = this.yamlLoader.loadService(this.config.ucServicePath) + wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath) + configMap = this.yamlLoader.loadConfigmap(this.config.configMapPath) } override fun clearClusterEnvironment() { @@ -36,12 +47,19 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { this.topicManager.deleteTopics(this.config.kafkaTopics) this.deploymentManager.delete(this.ucDeployment) this.serviceManager.delete(this.ucService) + this.deploymentManager.delete(this.wgDeployment) } override fun initializeClusterEnvironment() { - this.workloadGeneratorStateCleaner.deleteAll() + // this.workloadGeneratorStateCleaner.deleteAll() + // since the workloadGenerators are not started they cant be deleted + this.topicManager.deleteTopics(this.config.kafkaTopics) - this.topicManager.createTopics(this.config.kafkaTopics, this.config.kafkaPartition, this.config.kafkaReplication) + this.topicManager.createTopics( + this.config.kafkaTopics, + this.config.kafkaPartition, + this.config.kafkaReplication + ) } override fun startSUT(resources: Resource) { @@ -49,30 +67,36 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { // set environment variables val environmentVariables: MutableMap<String, String> = mutableMapOf() - environmentVariables.put("KAFKA_BOOTSTRAP_SERVER", this.config.kafkaIPConnectionString) - environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString) + //environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString) + //environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString) // setup deployment this.deploymentManager.setReplica(ucDeployment, resources.get()) - this.deploymentManager.setWorkloadEnv(ucDeployment,"uc-application", environmentVariables) + this.deploymentManager.setWorkloadEnv(ucDeployment, "uc-application", environmentVariables) + // create kubernetes resources this.deploymentManager.deploy(ucDeployment) this.serviceManager.deploy(ucService) + this.configMapManager.deploy(configMap) } override fun startWorkloadGenerator(load: LoadDimension) { - this.deploymentManager.setImageName(ucDeployment, "workload-generator", this.config.wgImageURL) - val wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath) + this.deploymentManager.setImageName(wgDeployment, "workload-generator", this.config.wgImageURL) + // TODO ("calculate number of required instances") val requiredInstances: Int = 1 val environmentVariables: MutableMap<String, String> = mutableMapOf() + //environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString) + //environmentVariables.put("ZK_HOST", this.config.zookeeperConnectionString.split(":")[0]) + //environmentVariables.put("ZK_PORT", this.config.zookeeperConnectionString.split(":")[1]) environmentVariables.put("NUM_SENSORS", load.get().toString()) - environmentVariables.put("NUM_INSTANCES", requiredInstances.toString()) + environmentVariables.put("INSTANCES", requiredInstances.toString()) - this.deploymentManager.setWorkloadEnv(wgDeployment, "workload-generator", environmentVariables) + this.deploymentManager.setWorkloadEnv(this.wgDeployment, "workload-generator", environmentVariables) + this.deploymentManager.deploy(this.wgDeployment) } data class UC1BenchmarkConfig( @@ -84,8 +108,9 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { val kafkaPartition: Int, val ucDeploymentPath: String, val ucServicePath: String, + val configMapPath: String, val wgDeploymentPath: String, val ucImageURL: String, val wgImageURL: String - ) {} -} \ No newline at end of file + ) {} +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt index faf7bf5f70fe574a48cb82074316cf2fe69414b6..3e6cf2396708d4af72c178a81517a2c620ad4061 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt @@ -2,19 +2,22 @@ package theodolite.util import theodolite.k8s.UC1Benchmark -class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig( - zookeeperConnectionString = "", - kafkaIPConnectionString = "", - schemaRegistryConnectionString = "", - kafkaTopics = emptyList(), - kafkaReplication = 0, - kafkaPartition = 0, - ucServicePath = "", - ucDeploymentPath = "", - wgDeploymentPath = "", - ucImageURL = "", - wgImageURL = "" -)){ +class TestBenchmark : Benchmark( + UC1Benchmark.UC1BenchmarkConfig( + zookeeperConnectionString = "", + kafkaIPConnectionString = "", + schemaRegistryConnectionString = "", + kafkaTopics = emptyList(), + kafkaReplication = 0, + kafkaPartition = 0, + ucServicePath = "", + ucDeploymentPath = "", + wgDeploymentPath = "", + configMapPath = "", + ucImageURL = "", + wgImageURL = "" + ) +) { override fun initializeClusterEnvironment() { TODO("Not yet implemented") @@ -31,4 +34,4 @@ class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig( override fun startWorkloadGenerator(load: LoadDimension) { TODO("Not yet implemented") } -} \ No newline at end of file +} diff --git a/theodolite-quarkus/src/main/resources/yaml/workloadGenerator.yaml b/theodolite-quarkus/src/main/resources/yaml/workloadGenerator.yaml index 794468b18dc74ca09872577b5b3c115605bd4620..242ce5f2dc0eb9523a530710c5774f440627a8f6 100644 --- a/theodolite-quarkus/src/main/resources/yaml/workloadGenerator.yaml +++ b/theodolite-quarkus/src/main/resources/yaml/workloadGenerator.yaml @@ -14,25 +14,21 @@ spec: spec: terminationGracePeriodSeconds: 0 containers: - - name: workload-generator - image: workload-generator:latest - env: - # Order need to be preserved for run_uc.py - - name: NUM_SENSORS - value: "25000" - - name: INSTANCES - value: "1" - - name: NUM_NESTED_GROUPS - value: "5" - - name: ZK_HOST - value: "my-confluent-cp-zookeeper" - - name: ZK_PORT - value: "2181" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "my-confluent-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://my-confluent-cp-schema-registry:8081" - - name: POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name + - name: workload-generator + image: workload-generator:latest + env: + # Order need to be preserved for run_uc.py + - name: NUM_SENSORS + value: "25000" + - name: INSTANCES + value: "1" + - name: NUM_NESTED_GROUPS + value: "5" + - name: ZK_HOST + value: "my-confluent-cp-zookeeper" + - name: ZK_PORT + value: "2181" + - name: KAFKA_BOOTSTRAP_SERVERS + value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" \ No newline at end of file