Skip to content
Snippets Groups Projects
Commit 7437021d authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Begin to restructure in order to introduce the benchmark type

parent 96b94d81
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!85Introduce new Benchmark class and Patcher,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 152 additions and 25 deletions
package theodolite
import io.quarkus.runtime.annotations.QuarkusMain
import theodolite.execution.TheodoliteExecutor
import mu.KotlinLogging
import theodolite.benchmark.TheodoliteYamlExecutor
private val logger = KotlinLogging.logger {}
......@@ -10,7 +10,8 @@ private val logger = KotlinLogging.logger {}
object Main {
@JvmStatic
fun main(args: Array<String>) {
val theodolite = TheodoliteExecutor()
//val theodolite = TheodoliteExecutor()
val theodolite = TheodoliteYamlExecutor()
theodolite.run()
logger.info("Application started")
}
......
package theodolite.benchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
interface Benchmark {
fun buildDeployment(load: LoadDimension, res: Resource): BenchmarkDeployment
}
\ No newline at end of file
package theodolite.benchmark
interface BenchmarkDeployment {
fun setup()
fun teardown()
}
\ No newline at end of file
package theodolite.benchmark
import org.yaml.snakeyaml.Yaml
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
class BenchmarkYamlParser<T>: Parser<T> {
override fun parse(path: String): T {
val input: InputStream = FileInputStream(File(path))
val parser = Yaml()
return parser.load<T>(input)
}
}
\ No newline at end of file
package theodolite.benchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
class KubernetesBenchmark(): Benchmark {
lateinit var name: String
override fun buildDeployment(load: LoadDimension, res: Resource): BenchmarkDeployment {
TODO("Not yet implemented")
}
}
\ No newline at end of file
package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.k8s.WorkloadGeneratorStateCleaner
class KubernetesBenchmarkDeployment(
val resources: List<KubernetesResource>, // List of already patched resources
private val kafkaConfig: HashMap<String, Any>,
private val zookeeperConfig: String,
private val topics: Collection<NewTopic>
// Maybe more
): BenchmarkDeployment {
private val workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.zookeeperConfig)
private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(DefaultKubernetesClient().inNamespace("default")) // Maybe per resource type
override fun setup() {
this.workloadGeneratorStateCleaner.deleteState()
kafkaController.createTopics(this.topics)
resources.forEach {
kubernetesManager.deploy(it)
}
}
override fun teardown() {
this.workloadGeneratorStateCleaner.deleteState()
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
resources.forEach {
kubernetesManager.remove(it)
}
}
}
package theodolite.benchmark
interface Parser<T> {
fun parse(path: String): T //Yaml
}
\ No newline at end of file
package theodolite.benchmark
class TheodoliteYamlExecutor {
fun run() {
val parser = BenchmarkYamlParser<KubernetesBenchmark>()
val benchmark= parser.parse("./../../../resources/main/yaml/test.yaml")
System.out.println(benchmark.name)
}
}
\ No newline at end of file
package theodolite.k8s
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.Volume
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import java.lang.IllegalArgumentException
class K8sManager(private val client: NamespacedKubernetesClient) {
fun deploy(resource: KubernetesResource) {
when (resource) {
is Deployment ->
this.client.apps().deployments().createOrReplace(resource)
is Service ->
this.client.services().createOrReplace(resource)
is ConfigMap ->
this.client.configMaps().createOrReplace(resource)
is StatefulSet ->
this.client.apps().statefulSets().createOrReplace(resource)
else -> throw IllegalArgumentException("Unknown kubernetes resource.")
}
}
fun remove(resource: KubernetesResource) {
when (resource) {
is Deployment ->
this.client.apps().deployments().delete(resource)
is Service ->
this.client.services().delete(resource)
is ConfigMap ->
this.client.configMaps().delete(resource)
is StatefulSet ->
this.client.apps().statefulSets().delete(resource)
else -> throw IllegalArgumentException("Unknown kubernetes resource.")
}
}
}
\ No newline at end of file
......@@ -12,13 +12,12 @@ private val logger = KotlinLogging.logger {}
* Manages the topics related tasks
* @param bootstrapServers Ip of the kafka server
*/
class TopicManager(bootstrapServers: String) {
private val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers)
class TopicManager(kafkaConfig: HashMap<String, Any>) {
lateinit var kafkaAdmin: AdminClient
init {
try {
kafkaAdmin = AdminClient.create(props)
kafkaAdmin = AdminClient.create(kafkaConfig)
} catch (e: Exception) {
logger.error { e.toString() }
}
......@@ -29,19 +28,12 @@ class TopicManager(bootstrapServers: String) {
* @param topics Map that holds a numPartition for each topic it should create
* @param replicationFactor
*/
fun createTopics(topics: Map<String, Int>, replicationFactor: Short) {
val newTopics = mutableSetOf<NewTopic>()
for (i in topics) {
val tops = NewTopic(i.key, i.value, replicationFactor)
newTopics.add(tops)
}
fun createTopics(newTopics: Collection<NewTopic>) {
kafkaAdmin.createTopics(newTopics)
logger.info { "Topics created" }
}
fun createTopics(topics: List<String>, numPartitions: Int, replicationFactor: Short) {
val newTopics = mutableSetOf<NewTopic>()
for (i in topics) {
val tops = NewTopic(i, numPartitions, replicationFactor)
......@@ -52,10 +44,10 @@ class TopicManager(bootstrapServers: String) {
}
/**
* Deletes topics.
* Removes topics.
* @param topics
*/
fun deleteTopics(topics: List<String>) {
fun removeTopics(topics: List<String>) {
val result = kafkaAdmin.deleteTopics(topics)
......@@ -64,7 +56,7 @@ class TopicManager(bootstrapServers: String) {
} catch (ex: Exception) {
logger.error { ex.toString() }
}
logger.info { "Topics deleted" }
logger.info { "Topics removed" }
}
fun getTopics(): ListTopicsResult? {
......
......@@ -29,8 +29,8 @@ class UC1Benchmark(config: Config) : AbstractBenchmark(config) {
init {
this.workloadGeneratorStateCleaner =
WorkloadGeneratorStateCleaner(this.config.externalZookeeperConnectionString, path = "/workload-generation")
this.topicManager = TopicManager(this.config.externalKafkaConnectionString)
WorkloadGeneratorStateCleaner(this.config.externalZookeeperConnectionString)
this.topicManager = TopicManager(hashMapOf("bootstrap.servers" to this.config.externalKafkaConnectionString))
this.kubernetesClient = DefaultKubernetesClient().inNamespace("default")
this.yamlLoader = YamlLoader(this.kubernetesClient)
this.deploymentManager = DeploymentManager(this.kubernetesClient)
......@@ -43,8 +43,8 @@ class UC1Benchmark(config: Config) : AbstractBenchmark(config) {
}
override fun clearClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteAll()
this.topicManager.deleteTopics(this.config.kafkaTopics)
this.workloadGeneratorStateCleaner.deleteState()
this.topicManager.removeTopics(this.config.kafkaTopics)
this.deploymentManager.delete(this.ucDeployment)
this.serviceManager.delete(this.ucService)
this.deploymentManager.delete(this.wgDeployment)
......@@ -57,7 +57,6 @@ class UC1Benchmark(config: Config) : AbstractBenchmark(config) {
this.config.kafkaReplication
)
}
override fun startSUT(resources: Resource) {
this.deploymentManager.setImageName(ucDeployment, "uc-application", this.config.ucImageURL)
......
......@@ -12,24 +12,25 @@ private val logger = KotlinLogging.logger {}
/**
* Resets the workloadgenerator states in zookeper (and potentially watches for Zookeper events)
*
* @param ip of zookeeper
* @param connectionString of zookeeper
* @param path path of the zookeeper node
*/
class WorkloadGeneratorStateCleaner(ip: String, val path: String) {
class WorkloadGeneratorStateCleaner(connectionString: String) {
private val timeout: Duration = Duration.ofMillis(500)
private val retryAfter: Duration = Duration.ofSeconds(5)
lateinit var zookeeperClient: ZooKeeper
private val path = "/workload-generation"
init {
try {
val watcher: Watcher = ZookeeperWatcher() // defined below
zookeeperClient = ZooKeeper(ip, timeout.toMillis().toInt(), watcher)
zookeeperClient = ZooKeeper(connectionString, timeout.toMillis().toInt(), watcher)
} catch (e: Exception) {
logger.error { e.toString() }
}
}
fun deleteAll() {
fun deleteState() {
deleteRecusiveAll(this.path)
logger.info { "ZooKeeper reset was successful" }
}
......
name: "theodolite ist cool"
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment