Skip to content
Snippets Groups Projects
Commit 8210e678 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Clean up

Remove old unused classes
Enhance code quality
some clean up
parent 82067307
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!85Introduce new Benchmark class and Patcher,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 16 additions and 365 deletions
......@@ -28,7 +28,7 @@ class KubernetesBenchmark(): Benchmark {
val resourcePath = "$basePath/$resource"
val kind = parser.parse(resourcePath, HashMap<String, String>()::class.java)?.get("kind") !!
val k8sResource = loader.loadK8sResource(kind , resourcePath)
Pair<String, KubernetesResource>(resource, k8sResource)
Pair(resource, k8sResource)
}
}
......@@ -45,12 +45,11 @@ class KubernetesBenchmark(): Benchmark {
// patch overrides
configurationOverrides.forEach{ override -> patcherManager.applyPatcher(listOf(override.patcher), resources, override.value)}
resources.forEach { r -> println(r) }
return KubernetesBenchmarkDeployment(
resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapSever),
zookeeperConfig = zookeeperConfig["server"].toString(),
topics = kafkaConfig.topics.map { topic -> NewTopic(topic.name, topic.partition, topic.replication ) })
topics = kafkaConfig.topics)
}
}
......@@ -13,7 +13,6 @@ class KubernetesBenchmarkDeployment(
private val kafkaConfig: HashMap<String, Any>,
private val zookeeperConfig: String,
private val topics: Collection<NewTopic>
// Maybe more
): BenchmarkDeployment {
private val workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.zookeeperConfig)
private val kafkaController = TopicManager(this.kafkaConfig)
......
package theodolite.deprecated
import theodolite.util.LoadDimension
import theodolite.util.Resource
abstract class AbstractBenchmark(val config: Config): Benchmark {
override fun start(load: LoadDimension, resources: Resource) {
this.clearClusterEnvironment()
this.initializeClusterEnvironment()
this.startSUT(resources)
this.startWorkloadGenerator(load)
}
data class Config(
val clusterZookeeperConnectionString: String,
val clusterKafkaConnectionString: String,
val externalZookeeperConnectionString: String,
val externalKafkaConnectionString: String,
val schemaRegistryConnectionString: String,
val kafkaTopics: List<String>,
val kafkaReplication: Short,
val kafkaPartition: Int,
val ucDeploymentPath: String,
val ucServicePath: String,
val configMapPath: String,
val wgDeploymentPath: String,
val ucImageURL: String,
val wgImageURL: String
) {}
}
\ No newline at end of file
package theodolite.deprecated
import theodolite.util.LoadDimension
import theodolite.util.Resource
interface Benchmark {
fun start(load: LoadDimension, resources: Resource) {
}
fun initializeClusterEnvironment();
fun clearClusterEnvironment();
fun startSUT(resources: Resource);
fun startWorkloadGenerator(load: LoadDimension)
}
package theodolite.deprecated
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
class ConfigMapManager(private val client: NamespacedKubernetesClient) {
fun deploy(configMap: ConfigMap) {
this.client.configMaps().createOrReplace(configMap)
}
fun delete(configMap: ConfigMap) {
this.client.configMaps().delete(configMap)
}
}
package theodolite.deprecated
import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarSource
import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
class DeploymentManager(private val client: NamespacedKubernetesClient) {
/**
* Sets the ContainerEvironmentVariables, creates new if variable don t exist.
* @param container - The Container
* @param map - Map of k=Name,v =Value of EnviromentVariables
*/
private fun setContainerEnv(container: Container, map: Map<String, String>) {
map.forEach { k, v ->
// filter for mathing name and set value
val x = container.env.filter { envVar -> envVar.name == k }
if (x.isEmpty()) {
val newVar = EnvVar(k, v, EnvVarSource())
container.env.add(newVar)
} else {
x.forEach {
it.value = v
}
}
}
}
/**
* Set the environment Variable for a container
*/
fun setWorkloadEnv(workloadDeployment: Deployment, containerName: String, map: Map<String, String>) {
workloadDeployment.spec.template.spec.containers.filter { it.name == containerName }
.forEach { it: Container ->
setContainerEnv(it, map)
}
}
/**
* Change the RessourceLimit of a container (Usally SUT)
*/
fun changeRessourceLimits(deployment: Deployment, ressource: String, containerName: String, limit: String) {
deployment.spec.template.spec.containers.filter { it.name == containerName }.forEach {
it.resources.limits.replace(ressource, Quantity(limit))
}
}
/**
* Change the image name of a container (SUT and the Worklaodgenerators)
*/
fun setImageName(deployment: Deployment, containerName: String, image: String) {
deployment.spec.template.spec.containers.filter { it.name == containerName }.forEach {
it.image = image
}
}
/**
* Change the image name of a container (SUT and the Worklaodgenerators)
*/
fun setReplica(deployment: Deployment, replicas: Int) {
deployment.spec.setReplicas(replicas)
}
// TODO potential add exception handling
fun deploy(deployment: Deployment) {
this.client.apps().deployments().createOrReplace(deployment)
}
// TODO potential add exception handling
fun delete(deployment: Deployment) {
this.client.apps().deployments().delete(deployment)
}
}
package theodolite.deprecated
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
class ServiceManager(private val client: NamespacedKubernetesClient) {
fun changeServiceName(service: Service, newName: String) {
service.metadata.apply {
name = newName
}
}
fun deploy(service: Service) {
client.services().createOrReplace(service)
}
fun delete(service: Service) {
client.services().delete(service)
}
}
package theodolite.deprecated
/*
import mu.KotlinLogging
import theodolite.deprecated.AbstractBenchmark
import theodolite.k8s.UC1Benchmark
import theodolite.strategies.restriction.LowerBoundRestriction
import theodolite.strategies.searchstrategy.CompositeStrategy
import theodolite.strategies.searchstrategy.LinearSearch
import theodolite.util.*
import java.nio.file.Paths
import java.time.Duration
private val logger = KotlinLogging.logger {}
class TheodoliteExecutor() {
val projectDirAbsolutePath = Paths.get("").toAbsolutePath().toString()
val resourcesPath = Paths.get(projectDirAbsolutePath, "./../../../resources/main/yaml/")
private fun loadConfig(): Config {
logger.info { resourcesPath }
val benchmark: UC1Benchmark = UC1Benchmark(
AbstractBenchmark.Config(
clusterZookeeperConnectionString = "my-confluent-cp-zookeeper:2181",
clusterKafkaConnectionString = "my-confluent-cp-kafka:9092",
externalZookeeperConnectionString = "localhost:2181",
externalKafkaConnectionString = "localhost:9092",
schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081",
kafkaPartition = 40,
kafkaReplication = 1,
kafkaTopics = listOf("input", "output"),
// TODO("handle path in a more nice way (not absolut)")
ucDeploymentPath = "$resourcesPath/aggregation-deployment.yaml",
ucServicePath = "$resourcesPath/aggregation-service.yaml",
wgDeploymentPath = "$resourcesPath/workloadGenerator.yaml",
configMapPath = "$resourcesPath/jmx-configmap.yaml",
ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest",
wgImageURL = "ghcr.io/cau-se/theodolite-uc1-workload-generator:theodolite-kotlin-latest"
)
)
val results: Results = Results()
val executionDuration = Duration.ofSeconds(60 * 5)
val executor: BenchmarkExecutor = BenchmarkExecutorImpl(benchmark, results, executionDuration)
val restrictionStrategy = LowerBoundRestriction(results)
val searchStrategy = LinearSearch(executor)
return Config(
loads = listOf(5000, 10000).map { number -> LoadDimension(number) },
resources = (1..6).map { number -> Resource(number) },
compositeStrategy = CompositeStrategy(
executor,
searchStrategy,
restrictionStrategies = setOf(restrictionStrategy)
),
executionDuration = executionDuration
)
}
fun run() {
// read or get benchmark config
val config = this.loadConfig()
// execute benchmarks for each load
for (load in config.loads) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
}
*/
\ No newline at end of file
package theodolite.deprecated
/*
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
import theodolite.deprecated.AbstractBenchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
private val logger = KotlinLogging.logger {}
class UC1Benchmark(config: Config) : AbstractBenchmark(config) {
private val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner
private val topicManager: TopicManager
// TODO("service monitor")
private val kubernetesClient: NamespacedKubernetesClient
private val yamlLoader: YamlLoader
private val deploymentManager: DeploymentManager
private val serviceManager: ServiceManager
private val configMapManager: ConfigMapManager
private var ucDeployment: Deployment
private var ucService: Service
private var wgDeployment: Deployment
private var configMap: ConfigMap
init {
this.workloadGeneratorStateCleaner =
WorkloadGeneratorStateCleaner(this.config.externalZookeeperConnectionString)
this.topicManager = TopicManager(hashMapOf("bootstrap.servers" to this.config.externalKafkaConnectionString))
this.kubernetesClient = DefaultKubernetesClient().inNamespace("default")
this.yamlLoader = YamlLoader(this.kubernetesClient)
this.deploymentManager = DeploymentManager(this.kubernetesClient)
this.serviceManager = ServiceManager(this.kubernetesClient)
this.configMapManager = ConfigMapManager(this.kubernetesClient)
ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath)
ucService = this.yamlLoader.loadService(this.config.ucServicePath)
wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath)
configMap = this.yamlLoader.loadConfigmap(this.config.configMapPath)
}
override fun clearClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteState()
this.topicManager.removeTopics(this.config.kafkaTopics)
this.deploymentManager.delete(this.ucDeployment)
this.serviceManager.delete(this.ucService)
this.deploymentManager.delete(this.wgDeployment)
}
override fun initializeClusterEnvironment() {
this.topicManager.createTopics(
this.config.kafkaTopics,
this.config.kafkaPartition,
this.config.kafkaReplication
)
}
override fun startSUT(resources: Resource) {
this.deploymentManager.setImageName(ucDeployment, "uc-application", this.config.ucImageURL)
// set environment variables
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.clusterKafkaConnectionString)
environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString)
// setup deployment
this.deploymentManager.setReplica(ucDeployment, resources.get())
this.deploymentManager.setWorkloadEnv(ucDeployment, "uc-application", environmentVariables)
// create kubernetes resources
this.deploymentManager.deploy(ucDeployment)
this.serviceManager.deploy(ucService)
this.configMapManager.deploy(configMap)
}
override fun startWorkloadGenerator(load: LoadDimension) {
this.deploymentManager.setImageName(wgDeployment, "workload-generator", this.config.wgImageURL)
// TODO ("calculate number of required instances")
val requiredInstances = 1
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.clusterKafkaConnectionString)
environmentVariables.put("ZK_HOST", this.config.clusterZookeeperConnectionString.split(":")[0])
environmentVariables.put("ZK_PORT", this.config.clusterZookeeperConnectionString.split(":")[1])
environmentVariables["NUM_SENSORS"] = load.get().toString()
environmentVariables["INSTANCES"] = requiredInstances.toString()
this.deploymentManager.setWorkloadEnv(this.wgDeployment, "workload-generator", environmentVariables)
this.deploymentManager.deploy(this.wgDeployment)
}
}
*/
\ No newline at end of file
......@@ -12,9 +12,7 @@ import java.time.Duration
class TheodoliteExecutor(
private val config: BenchmarkExecution,
private val kubernetesBenchmark: KubernetesBenchmark
)
{
private val kubernetesBenchmark: KubernetesBenchmark) {
private fun buildConfig(): Config{
val results = Results()
......@@ -25,12 +23,11 @@ class TheodoliteExecutor(
return Config(
loads = config.load.loadValues.map { load -> LoadDimension(load, config.load.loadType ) },
resources = config.resources.resourceValues.map { resource -> Resource(resource, config.load.loadType) },
resources = config.resources.resourceValues.map { resource -> Resource(resource, config.resources.resourceType) },
compositeStrategy = CompositeStrategy(
benchmarkExecutor = executor,
searchStrategy = strategyFactory.createSearchStrategy(executor, config.execution.strategy),
restrictionStrategies = strategyFactory.createRestrictionStrategy(results, config.execution.restrictions)),
executionDuration = executionDuration)
restrictionStrategies = strategyFactory.createRestrictionStrategy(results, config.execution.restrictions)))
}
......@@ -42,6 +39,5 @@ class TheodoliteExecutor(
for (load in config.loads) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
}
......@@ -49,7 +49,6 @@ class TopicManager(kafkaConfig: HashMap<String, Any>) {
* @param topics
*/
fun removeTopics(topics: List<String>) {
val result = kafkaAdmin.deleteTopics(topics)
try {
......
......@@ -10,10 +10,9 @@ import java.time.Duration
private val logger = KotlinLogging.logger {}
/**
* Resets the workloadgenerator states in zookeper (and potentially watches for Zookeper events)
* Resets the Workloadgenerator states in Zookeeper (and potentially watches for Zookeeper events)
*
* @param connectionString of zookeeper
* @param path path of the zookeeper node
*/
class WorkloadGeneratorStateCleaner(connectionString: String) {
private val timeout: Duration = Duration.ofMillis(500)
......@@ -31,14 +30,14 @@ class WorkloadGeneratorStateCleaner(connectionString: String) {
}
fun deleteState() {
deleteRecusiveAll(this.path)
deleteRecursiveAll(this.path)
logger.info { "ZooKeeper reset was successful" }
}
/**
* Deletes a Zookeeper node and its children with the corresponding path.
*/
private fun deleteRecusiveAll(nodePath: String) {
private fun deleteRecursiveAll(nodePath: String) {
while (true) {
var children: List<String>
......@@ -47,10 +46,10 @@ class WorkloadGeneratorStateCleaner(connectionString: String) {
} catch (e: KeeperException.NoNodeException) {
break;
}
// recursivly delete all children nodes
// recursively delete all children nodes
for (s: String in children) {
try {
deleteRecusiveAll("$nodePath/$s")
deleteRecursiveAll("$nodePath/$s")
} catch (ex: Exception) {
logger.info { "$ex" }
}
......
......@@ -4,10 +4,10 @@ import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.apps.Deployment
class ReplicaPatcher(private val k8sResource: KubernetesResource): AbstractPatcher(k8sResource){
override fun <Int> patch(replicas: Int) {
override fun <Int> patch(value: Int) {
if (k8sResource is Deployment) {
if (replicas is kotlin.Int) {
this.k8sResource.spec.replicas = replicas
if (value is kotlin.Int) {
this.k8sResource.spec.replicas = value
}
}
}
......
......@@ -8,8 +8,4 @@ import java.time.Duration
data class Config(
val loads: List<LoadDimension>,
val resources: List<Resource>,
val compositeStrategy: CompositeStrategy,
val executionDuration: Duration
) {
}
\ No newline at end of file
val compositeStrategy: CompositeStrategy) {}
\ No newline at end of file
package theodolite.util
import kotlin.properties.Delegates
import org.apache.kafka.clients.admin.NewTopic
class KafkaConfig() {
lateinit var bootstrapSever: String
lateinit var topics: List<Topic>
class Topic() {
lateinit var name: String
var partition by Delegates.notNull<Int>()
var replication by Delegates.notNull<Short>()
lateinit var topics: List<NewTopic>
}
}
\ No newline at end of file
......@@ -5,5 +5,4 @@ class PatcherDefinition() {
lateinit var resource: String
lateinit var container: String
lateinit var variableName: String
lateinit var value: String
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment