Skip to content
Snippets Groups Projects
Commit 8ce82904 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'feature/UC1Benchmark' into 109-implement-kotlin-prototype

parents d41f4c90 3889bc89
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!78Resolve "Implement Quarkus/Kotlin protype"
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
## ------------------------------------------------------ ## ------------------------------------------------------
cp-zookeeper: cp-zookeeper:
enabled: true enabled: true
servers: 3 servers: 1
image: confluentinc/cp-zookeeper image: confluentinc/cp-zookeeper
imageTag: 5.4.0 imageTag: 5.4.0
## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace. ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
...@@ -28,7 +28,7 @@ cp-zookeeper: ...@@ -28,7 +28,7 @@ cp-zookeeper:
## ------------------------------------------------------ ## ------------------------------------------------------
cp-kafka: cp-kafka:
enabled: true enabled: true
brokers: 10 brokers: 1
image: confluentinc/cp-enterprise-kafka image: confluentinc/cp-enterprise-kafka
imageTag: 5.4.0 imageTag: 5.4.0
## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace. ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
...@@ -48,7 +48,7 @@ cp-kafka: ...@@ -48,7 +48,7 @@ cp-kafka:
# cpu: 100m # cpu: 100m
# memory: 128Mi # memory: 128Mi
configurationOverrides: configurationOverrides:
#"offsets.topic.replication.factor": "3" offsets.topic.replication.factor: "1"
"message.max.bytes": "134217728" # 128 MB "message.max.bytes": "134217728" # 128 MB
"replica.fetch.max.bytes": "134217728" # 128 MB "replica.fetch.max.bytes": "134217728" # 128 MB
# "default.replication.factor": 3 # "default.replication.factor": 3
......
cp-helm-charts:
## ------------------------------------------------------
## Zookeeper
## ------------------------------------------------------
cp-zookeeper:
servers: 1 # default: 3
## ------------------------------------------------------
## Kafka
## ------------------------------------------------------
cp-kafka:
brokers: 1 # deauflt: 10
configurationOverrides:
offsets.topic.replication.factor: "3"
\ No newline at end of file
...@@ -5,39 +5,52 @@ import theodolite.k8s.UC1Benchmark ...@@ -5,39 +5,52 @@ import theodolite.k8s.UC1Benchmark
import theodolite.strategies.restriction.LowerBoundRestriction import theodolite.strategies.restriction.LowerBoundRestriction
import theodolite.strategies.searchstrategy.CompositeStrategy import theodolite.strategies.searchstrategy.CompositeStrategy
import theodolite.strategies.searchstrategy.LinearSearch import theodolite.strategies.searchstrategy.LinearSearch
import theodolite.util.* import theodolite.util.Config
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration import java.time.Duration
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class TheodoliteExecutor() { class TheodoliteExecutor() {
val path = "/home/lorenz/git/spesb/theodolite-quarkus/src/main/resources/yaml"
private fun loadConfig(): Config { private fun loadConfig(): Config {
val benchmark: UC1Benchmark = UC1Benchmark( val benchmark: UC1Benchmark = UC1Benchmark(
UC1Benchmark.UC1BenchmarkConfig( UC1Benchmark.UC1BenchmarkConfig( // use port forward 2181 -> 2181
zookeeperConnectionString = "my-confluent-cp-zookeeper:2181", zookeeperConnectionString = "localhost:2181", //"my-confluent-cp-zookeeper:2181", //localhost:2181.
kafkaIPConnectionString = "my-confluent-cp-kafka:9092", kafkaIPConnectionString = "localhost:9092",//"my-confluent-cp-kafka:","178.18.0."
schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081", schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081",
kafkaPartition = 40, kafkaPartition = 40,
kafkaReplication = 3, kafkaReplication = 1,
kafkaTopics = listOf("input", "output"), kafkaTopics = listOf("input", "output"),
// TODO("handle path in a more nice way (not absolut)") // TODO("handle path in a more nice way (not absolut)")
ucDeploymentPath = "src/main/resources/yaml/aggregation-deployment.yaml", ucDeploymentPath = path + "/aggregation-deployment.yaml",
ucServicePath = "src/main/resources/yaml/aggregation-service.yaml", ucServicePath = path + "/aggregation-service.yaml",
wgDeploymentPath = "src/main/resources/yaml/workloadGenerator.yaml", wgDeploymentPath = path + "/workloadGenerator.yaml",
configMapPath = path + "/jmx-configmap.yaml",
ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest", ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest",
wgImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-workload-generator:latest" wgImageURL = "ghcr.io/cau-se/theodolite-uc1-workload-generator:theodolite-kotlin-latest"
)) )
)
val results: Results = Results() val results: Results = Results()
val executionDuration = Duration.ofSeconds(60*5)
val executionDuration = Duration.ofSeconds(60 * 5)
val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration) val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration)
val restrictionStrategy = LowerBoundRestriction(results) val restrictionStrategy = LowerBoundRestriction(results)
val searchStrategy = LinearSearch(executor, results) val searchStrategy = LinearSearch(executor, results)
return Config( return Config(
loads = (0..6).map{ number -> LoadDimension(number) }, loads = (1..6).map { number -> LoadDimension(number) },
resources = (0..6).map{ number -> Resource(number) }, resources = (1..6).map { number -> Resource(number) },
compositeStrategy = CompositeStrategy(executor, searchStrategy, restrictionStrategies = setOf(restrictionStrategy), results = results), compositeStrategy = CompositeStrategy(
executor,
searchStrategy,
restrictionStrategies = setOf(restrictionStrategy),
results = results
),
executionDuration = executionDuration executionDuration = executionDuration
) )
} }
...@@ -47,9 +60,8 @@ class TheodoliteExecutor() { ...@@ -47,9 +60,8 @@ class TheodoliteExecutor() {
val config = this.loadConfig() val config = this.loadConfig()
// execute benchmarks for each load // execute benchmarks for each load
for(load in config.loads) { for (load in config.loads) {
config.compositeStrategy.findSuitableResource(load, config.resources) config.compositeStrategy.findSuitableResource(load, config.resources)
} }
} }
} }
\ No newline at end of file
...@@ -11,7 +11,7 @@ class ConfigMapManager(client: NamespacedKubernetesClient) { ...@@ -11,7 +11,7 @@ class ConfigMapManager(client: NamespacedKubernetesClient) {
} }
fun deploy(configMap: ConfigMap) { fun deploy(configMap: ConfigMap) {
client.configMaps().create(configMap) client.configMaps().createOrReplace(configMap)
} }
fun delete(configMap: ConfigMap) { fun delete(configMap: ConfigMap) {
......
...@@ -75,7 +75,7 @@ class DeploymentManager(client: NamespacedKubernetesClient) { ...@@ -75,7 +75,7 @@ class DeploymentManager(client: NamespacedKubernetesClient) {
// TODO potential add exception handling // TODO potential add exception handling
fun deploy(deployment: Deployment) { fun deploy(deployment: Deployment) {
client.apps().deployments().create(deployment) client.apps().deployments().createOrReplace(deployment)
} }
// TODO potential add exception handling // TODO potential add exception handling
......
...@@ -18,7 +18,7 @@ class ServiceManager(client: NamespacedKubernetesClient) { ...@@ -18,7 +18,7 @@ class ServiceManager(client: NamespacedKubernetesClient) {
} }
fun deploy(service: Service) { fun deploy(service: Service) {
client.services().create(service) client.services().createOrReplace(service)
} }
fun delete(service: Service) { fun delete(service: Service) {
......
package theodolite.k8s package theodolite.k8s
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.api.model.Service import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
import theodolite.util.Benchmark import theodolite.util.Benchmark
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
private val logger = KotlinLogging.logger {}
class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner
val topicManager: TopicManager val topicManager: TopicManager
// TODO("service monitor") // TODO("service monitor")
val kubernetesClient: NamespacedKubernetesClient val kubernetesClient: NamespacedKubernetesClient
val yamlLoader: YamlLoader val yamlLoader: YamlLoader
val deploymentManager: DeploymentManager val deploymentManager: DeploymentManager
val serviceManager: ServiceManager val serviceManager: ServiceManager
val configMapManager: ConfigMapManager
var ucDeployment: Deployment var ucDeployment: Deployment
var ucService: Service var ucService: Service
var wgDeployment: Deployment
var configMap: ConfigMap
init { init {
this.workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.config.zookeeperConnectionString) this.workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.config.zookeeperConnectionString)
this.topicManager = TopicManager(this.config.kafkaIPConnectionString) this.topicManager = TopicManager(this.config.kafkaIPConnectionString)
this.kubernetesClient = DefaultKubernetesClient() this.kubernetesClient = DefaultKubernetesClient().inNamespace("default")
this.yamlLoader = YamlLoader(this.kubernetesClient) this.yamlLoader = YamlLoader(this.kubernetesClient)
this.deploymentManager = DeploymentManager(this.kubernetesClient) this.deploymentManager = DeploymentManager(this.kubernetesClient)
this.serviceManager = ServiceManager(this.kubernetesClient) this.serviceManager = ServiceManager(this.kubernetesClient)
this.configMapManager = ConfigMapManager(this.kubernetesClient)
ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath) ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath)
ucService = this.yamlLoader.loadService(this.config.ucServicePath) ucService = this.yamlLoader.loadService(this.config.ucServicePath)
wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath)
configMap = this.yamlLoader.loadConfigmap(this.config.configMapPath)
} }
override fun clearClusterEnvironment() { override fun clearClusterEnvironment() {
...@@ -36,12 +47,19 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { ...@@ -36,12 +47,19 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
this.topicManager.deleteTopics(this.config.kafkaTopics) this.topicManager.deleteTopics(this.config.kafkaTopics)
this.deploymentManager.delete(this.ucDeployment) this.deploymentManager.delete(this.ucDeployment)
this.serviceManager.delete(this.ucService) this.serviceManager.delete(this.ucService)
this.deploymentManager.delete(this.wgDeployment)
} }
override fun initializeClusterEnvironment() { override fun initializeClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteAll() // this.workloadGeneratorStateCleaner.deleteAll()
// since the workloadGenerators are not started they cant be deleted
this.topicManager.deleteTopics(this.config.kafkaTopics) this.topicManager.deleteTopics(this.config.kafkaTopics)
this.topicManager.createTopics(this.config.kafkaTopics, this.config.kafkaPartition, this.config.kafkaReplication) this.topicManager.createTopics(
this.config.kafkaTopics,
this.config.kafkaPartition,
this.config.kafkaReplication
)
} }
override fun startSUT(resources: Resource) { override fun startSUT(resources: Resource) {
...@@ -49,30 +67,36 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { ...@@ -49,30 +67,36 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
// set environment variables // set environment variables
val environmentVariables: MutableMap<String, String> = mutableMapOf() val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVER", this.config.kafkaIPConnectionString) //environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString) //environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString)
// setup deployment // setup deployment
this.deploymentManager.setReplica(ucDeployment, resources.get()) this.deploymentManager.setReplica(ucDeployment, resources.get())
this.deploymentManager.setWorkloadEnv(ucDeployment,"uc-application", environmentVariables) this.deploymentManager.setWorkloadEnv(ucDeployment, "uc-application", environmentVariables)
// create kubernetes resources // create kubernetes resources
this.deploymentManager.deploy(ucDeployment) this.deploymentManager.deploy(ucDeployment)
this.serviceManager.deploy(ucService) this.serviceManager.deploy(ucService)
this.configMapManager.deploy(configMap)
} }
override fun startWorkloadGenerator(load: LoadDimension) { override fun startWorkloadGenerator(load: LoadDimension) {
this.deploymentManager.setImageName(ucDeployment, "workload-generator", this.config.wgImageURL) this.deploymentManager.setImageName(wgDeployment, "workload-generator", this.config.wgImageURL)
val wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath)
// TODO ("calculate number of required instances") // TODO ("calculate number of required instances")
val requiredInstances: Int = 1 val requiredInstances: Int = 1
val environmentVariables: MutableMap<String, String> = mutableMapOf() val environmentVariables: MutableMap<String, String> = mutableMapOf()
//environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
//environmentVariables.put("ZK_HOST", this.config.zookeeperConnectionString.split(":")[0])
//environmentVariables.put("ZK_PORT", this.config.zookeeperConnectionString.split(":")[1])
environmentVariables.put("NUM_SENSORS", load.get().toString()) environmentVariables.put("NUM_SENSORS", load.get().toString())
environmentVariables.put("NUM_INSTANCES", requiredInstances.toString()) environmentVariables.put("INSTANCES", requiredInstances.toString())
this.deploymentManager.setWorkloadEnv(wgDeployment, "workload-generator", environmentVariables) this.deploymentManager.setWorkloadEnv(this.wgDeployment, "workload-generator", environmentVariables)
this.deploymentManager.deploy(this.wgDeployment)
} }
data class UC1BenchmarkConfig( data class UC1BenchmarkConfig(
...@@ -84,8 +108,9 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) { ...@@ -84,8 +108,9 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val kafkaPartition: Int, val kafkaPartition: Int,
val ucDeploymentPath: String, val ucDeploymentPath: String,
val ucServicePath: String, val ucServicePath: String,
val configMapPath: String,
val wgDeploymentPath: String, val wgDeploymentPath: String,
val ucImageURL: String, val ucImageURL: String,
val wgImageURL: String val wgImageURL: String
) {} ) {}
} }
\ No newline at end of file
...@@ -2,19 +2,22 @@ package theodolite.util ...@@ -2,19 +2,22 @@ package theodolite.util
import theodolite.k8s.UC1Benchmark import theodolite.k8s.UC1Benchmark
class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig( class TestBenchmark : Benchmark(
zookeeperConnectionString = "", UC1Benchmark.UC1BenchmarkConfig(
kafkaIPConnectionString = "", zookeeperConnectionString = "",
schemaRegistryConnectionString = "", kafkaIPConnectionString = "",
kafkaTopics = emptyList(), schemaRegistryConnectionString = "",
kafkaReplication = 0, kafkaTopics = emptyList(),
kafkaPartition = 0, kafkaReplication = 0,
ucServicePath = "", kafkaPartition = 0,
ucDeploymentPath = "", ucServicePath = "",
wgDeploymentPath = "", ucDeploymentPath = "",
ucImageURL = "", wgDeploymentPath = "",
wgImageURL = "" configMapPath = "",
)){ ucImageURL = "",
wgImageURL = ""
)
) {
override fun initializeClusterEnvironment() { override fun initializeClusterEnvironment() {
TODO("Not yet implemented") TODO("Not yet implemented")
...@@ -31,4 +34,4 @@ class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig( ...@@ -31,4 +34,4 @@ class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig(
override fun startWorkloadGenerator(load: LoadDimension) { override fun startWorkloadGenerator(load: LoadDimension) {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
} }
\ No newline at end of file
...@@ -14,25 +14,21 @@ spec: ...@@ -14,25 +14,21 @@ spec:
spec: spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: workload-generator:latest image: workload-generator:latest
env: env:
# Order need to be preserved for run_uc.py # Order need to be preserved for run_uc.py
- name: NUM_SENSORS - name: NUM_SENSORS
value: "25000" value: "25000"
- name: INSTANCES - name: INSTANCES
value: "1" value: "1"
- name: NUM_NESTED_GROUPS - name: NUM_NESTED_GROUPS
value: "5" value: "5"
- name: ZK_HOST - name: ZK_HOST
value: "my-confluent-cp-zookeeper" value: "my-confluent-cp-zookeeper"
- name: ZK_PORT - name: ZK_PORT
value: "2181" value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL - name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081" value: "http://my-confluent-cp-schema-registry:8081"
- name: POD_NAME \ No newline at end of file
valueFrom:
fieldRef:
fieldPath: metadata.name
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment