Skip to content
Snippets Groups Projects
Commit fa0af504 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix creatOrReplace and workloadGenrator deletion, but not kafka

parent 53128498
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!78Resolve "Implement Quarkus/Kotlin protype"
......@@ -3,7 +3,7 @@
## ------------------------------------------------------
cp-zookeeper:
enabled: true
servers: 3
servers: 1
image: confluentinc/cp-zookeeper
imageTag: 5.4.0
## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
......@@ -28,7 +28,7 @@ cp-zookeeper:
## ------------------------------------------------------
cp-kafka:
enabled: true
brokers: 10
brokers: 1
image: confluentinc/cp-enterprise-kafka
imageTag: 5.4.0
## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
......@@ -48,7 +48,7 @@ cp-kafka:
# cpu: 100m
# memory: 128Mi
configurationOverrides:
#"offsets.topic.replication.factor": "3"
offsets.topic.replication.factor: "1"
"message.max.bytes": "134217728" # 128 MB
"replica.fetch.max.bytes": "134217728" # 128 MB
# "default.replication.factor": 3
......
cp-helm-charts:
## ------------------------------------------------------
## Zookeeper
## ------------------------------------------------------
cp-zookeeper:
servers: 1 # default: 3
## ------------------------------------------------------
## Kafka
## ------------------------------------------------------
cp-kafka:
brokers: 1 # deauflt: 10
configurationOverrides:
offsets.topic.replication.factor: "3"
\ No newline at end of file
......@@ -4,37 +4,47 @@ import theodolite.k8s.UC1Benchmark
import theodolite.strategies.restriction.LowerBoundRestriction
import theodolite.strategies.searchstrategy.CompositeStrategy
import theodolite.strategies.searchstrategy.LinearSearch
import theodolite.util.*
import theodolite.util.Config
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration
class TheodoliteExecutor() {
val path = "/home/lorenz/git/spesb/theodolite-quarkus/src/main/resources/yaml"
private fun loadConfig(): Config {
val benchmark: UC1Benchmark = UC1Benchmark(
UC1Benchmark.UC1BenchmarkConfig(
zookeeperConnectionString = "my-confluent-cp-zookeeper:2181",
kafkaIPConnectionString = "my-confluent-cp-kafka:9092",
UC1Benchmark.UC1BenchmarkConfig( // use port forward 2181 -> 2181
zookeeperConnectionString = "127.0.0.1:2181", //"my-confluent-cp-zookeeper:2181", //localhost:2181.
kafkaIPConnectionString = "localhost:9093",//"my-confluent-cp-kafka:","178.18.0."
schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081",
kafkaPartition = 40,
kafkaReplication = 3,
kafkaReplication = 1,
kafkaTopics = listOf("input", "output"),
// TODO("handle path in a more nice way (not absolut)")
ucDeploymentPath = "src/main/resources/yaml/aggregation-deployment.yaml",
ucServicePath = "src/main/resources/yaml/aggregation-service.yaml",
wgDeploymentPath = "src/main/resources/yaml/workloadGenerator.yaml",
ucDeploymentPath = path + "/aggregation-deployment.yaml",
ucServicePath = path + "/aggregation-service.yaml",
wgDeploymentPath = path + "/workloadGenerator.yaml",
ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest",
wgImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-workload-generator:latest"
))
)
)
val results: Results = Results()
val executionDuration = Duration.ofSeconds(60*5 )
val executionDuration = Duration.ofSeconds(60 * 5)
val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration)
val restrictionStrategy = LowerBoundRestriction(results)
val searchStrategy = LinearSearch(executor, results)
return Config(
loads = (0..6).map{ number -> LoadDimension(number) },
resources = (0..6).map{ number -> Resource(number) },
compositeStrategy = CompositeStrategy(executor, searchStrategy, restrictionStrategies = setOf(restrictionStrategy), results = results),
loads = (0..6).map { number -> LoadDimension(number) },
resources = (0..6).map { number -> Resource(number) },
compositeStrategy = CompositeStrategy(
executor,
searchStrategy,
restrictionStrategies = setOf(restrictionStrategy),
results = results
),
executionDuration = executionDuration
)
}
......@@ -44,7 +54,7 @@ class TheodoliteExecutor() {
val config = this.loadConfig()
// execute benchmarks for each load
for(load in config.loads) {
for (load in config.loads) {
config.compositeStrategy.findSuitableResources(load, config.resources)
}
......
......@@ -11,7 +11,7 @@ class ConfigMapManager(client: NamespacedKubernetesClient) {
}
fun deploy(configMap: ConfigMap) {
client.configMaps().create(configMap)
client.configMaps().createOrReplace(configMap)
}
fun delete(configMap: ConfigMap) {
......
......@@ -75,7 +75,7 @@ class DeploymentManager(client: NamespacedKubernetesClient) {
// TODO potential add exception handling
fun deploy(deployment: Deployment) {
client.apps().deployments().create(deployment)
client.apps().deployments().createOrReplace(deployment)
}
// TODO potential add exception handling
......
......@@ -18,7 +18,7 @@ class ServiceManager(client: NamespacedKubernetesClient) {
}
fun deploy(service: Service) {
client.services().create(service)
client.services().createOrReplace(service)
}
fun delete(service: Service) {
......
......@@ -11,6 +11,7 @@ import theodolite.util.Resource
class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner
val topicManager: TopicManager
// TODO("service monitor")
val kubernetesClient: NamespacedKubernetesClient
val yamlLoader: YamlLoader
......@@ -23,7 +24,7 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
init {
this.workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.config.zookeeperConnectionString)
this.topicManager = TopicManager(this.config.kafkaIPConnectionString)
this.kubernetesClient = DefaultKubernetesClient()
this.kubernetesClient = DefaultKubernetesClient().inNamespace("default")
this.yamlLoader = YamlLoader(this.kubernetesClient)
this.deploymentManager = DeploymentManager(this.kubernetesClient)
this.serviceManager = ServiceManager(this.kubernetesClient)
......@@ -39,9 +40,15 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
}
override fun initializeClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteAll()
// this.workloadGeneratorStateCleaner.deleteAll()
// since the workloadGenerators are not started they cant be deleted
this.topicManager.deleteTopics(this.config.kafkaTopics)
this.topicManager.createTopics(this.config.kafkaTopics, this.config.kafkaPartition, this.config.kafkaReplication)
this.topicManager.createTopics(
this.config.kafkaTopics,
this.config.kafkaPartition,
this.config.kafkaReplication
)
}
override fun startSUT(resources: Resource) {
......@@ -49,13 +56,13 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
// set environment variables
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVER", this.config.kafkaIPConnectionString)
environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString)
// setup deployment
this.deploymentManager.setReplica(ucDeployment, resources.get())
this.deploymentManager.setWorkloadEnv(ucDeployment,"uc-application", environmentVariables)
this.deploymentManager.setWorkloadEnv(ucDeployment, "uc-application", environmentVariables)
// create kubernetes resources
this.deploymentManager.deploy(ucDeployment)
......@@ -69,6 +76,11 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
// TODO ("calculate number of required instances")
val requiredInstances: Int = 1
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
environmentVariables.put("ZK_HOST", this.config.schemaRegistryConnectionString.split(":")[0])
environmentVariables.put("ZK_PORT", this.config.schemaRegistryConnectionString.split(":")[1])
environmentVariables.put("NUM_SENSORS", load.get().toString())
environmentVariables.put("NUM_INSTANCES", requiredInstances.toString())
......@@ -87,5 +99,5 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val wgDeploymentPath: String,
val ucImageURL: String,
val wgImageURL: String
) {}
) {}
}
\ No newline at end of file
package theodolite.util
class TestBenchmark: Benchmark(config = emptyMap()) {
override fun start() {
TODO("Not yet implemented")
}
override fun clearClusterEnvironment() {
TODO("Not yet implemented")
}
override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) {
TODO("Not yet implemented")
}
}
\ No newline at end of file
//class TestBenchmark: Benchmark(config = emptyMap()) {
// override fun start() {
// TODO("Not yet implemented")
// }
//
// override fun clearClusterEnvironment() {
// TODO("Not yet implemented")
// }
//
// override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) {
// TODO("Not yet implemented")
// }
//}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment