Skip to content
Snippets Groups Projects
Commit 5ed38dae authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' of...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/theodolite into theodolite-kotlin
parents 7911ea12 5ac54e1b
No related branches found
No related tags found
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2724 passed
Showing
with 119 additions and 91 deletions
......@@ -11,6 +11,7 @@ import kotlin.properties.Delegates
@JsonDeserialize
@RegisterForReflection
class BenchmarkExecution : CustomResource(), Namespaced {
var executionId: Int = 0
lateinit var name: String
lateinit var benchmark: String
lateinit var load: LoadDefinition
......
......@@ -26,7 +26,6 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
private fun loadKubernetesResources(resources: List<String>): List<Pair<String, KubernetesResource>> {
val parser = YamlParser()
val loader = K8sResourceLoader(DefaultKubernetesClient().inNamespace(namespace))
return resources
.map { resource ->
......
......@@ -9,7 +9,10 @@ import java.time.Instant
private val logger = KotlinLogging.logger {}
class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
class AnalysisExecutor(
private val slo: BenchmarkExecution.Slo,
private val executionId: Int
) {
private val fetcher = MetricFetcher(
prometheusURL = slo.prometheusUrl,
......@@ -26,8 +29,7 @@ class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
CsvExporter().toCsv(name = "${load.get()}_${res.get()}_${slo.sloType}", prom = prometheusData)
CsvExporter().toCsv(name = "$executionId-${load.get()}-${res.get()}-${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
......
......@@ -25,7 +25,8 @@ abstract class BenchmarkExecutor(
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo
val slo: BenchmarkExecution.Slo,
val executionId: Int
) {
var run: AtomicBoolean = AtomicBoolean(true)
......
......@@ -19,8 +19,9 @@ class BenchmarkExecutorImpl(
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
slo: BenchmarkExecution.Slo,
executionId: Int
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
......@@ -36,7 +37,7 @@ class BenchmarkExecutorImpl(
if (this.run.get()) {
result =
AnalysisExecutor(slo = slo).analyze(load = load, res = res, executionDuration = executionDuration)
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration)
this.results.setResult(Pair(load, res), result)
}
benchmarkDeployment.teardown()
......
......@@ -23,4 +23,4 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
deployment.teardown()
logger.info { "Teardown completed" }
}
}
\ No newline at end of file
}
package theodolite.execution
import com.google.gson.GsonBuilder
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.patcher.PatcherDefinitionFactory
......@@ -9,6 +10,7 @@ import theodolite.util.Config
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.io.PrintWriter
import java.time.Duration
class TheodoliteExecutor(
......@@ -37,11 +39,12 @@ class TheodoliteExecutor(
executor =
BenchmarkExecutorImpl(
kubernetesBenchmark,
results,
executionDuration,
config.configOverrides,
config.slos[0]
benchmark = kubernetesBenchmark,
results = results,
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
executionId = config.executionId
)
return Config(
......@@ -72,6 +75,9 @@ class TheodoliteExecutor(
}
fun run() {
storeAsFile(this.config, "${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "${this.config.executionId}-benchmark-configuration")
val config = buildConfig()
// execute benchmarks for each load
for (load in config.loads) {
......@@ -79,5 +85,14 @@ class TheodoliteExecutor(
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
storeAsFile(config.compositeStrategy.benchmarkExecutor.results, "${this.config.executionId}-result")
}
private fun <T> storeAsFile(saveObject: T, filename: String) {
val gson = GsonBuilder().enableComplexMapKeySerialization().setPrettyPrinting().create()
PrintWriter(filename).use { pw ->
pw.println(gson.toJson(saveObject))
}
}
}
......@@ -28,13 +28,15 @@ class TheodoliteYamlExecutor {
val benchmark =
parser.parse(path = benchmarkPath, E = KubernetesBenchmark::class.java)!!
val shutdown = Shutdown(benchmarkExecution, benchmark)
Runtime.getRuntime().addShutdownHook(thread { shutdown.run()})
// Add shutdown hook
// Use thread{} with start = false, else the thread will start right away
val shutdown = thread(start = false) { Shutdown(benchmarkExecution, benchmark).run() }
Runtime.getRuntime().addShutdownHook(shutdown)
val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run()
logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(thread { shutdown.run()})
Runtime.getRuntime().removeShutdownHook(shutdown)
exitProcess(0)
}
}
......@@ -2,6 +2,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import khttp.patch
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
......@@ -10,17 +11,20 @@ import java.lang.Thread.sleep
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
private val logger = KotlinLogging.logger {}
class TheodoliteController(
val client: NamespacedKubernetesClient,
val executionContext: CustomResourceDefinitionContext
val executionContext: CustomResourceDefinitionContext,
val path: String
) {
lateinit var executor: TheodoliteExecutor
val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque()
val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap()
var isUpdated = AtomicBoolean(false)
var executionID = AtomicInteger(0)
fun run() {
while (true) {
......@@ -59,7 +63,9 @@ class TheodoliteController(
@Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
execution.executionId = executionID.getAndSet(executionID.get() + 1)
isUpdated.set(false)
benchmark.path = path
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." }
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run()
......
......@@ -7,6 +7,7 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.BenchmarkExecutionList
import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.KubernetesBenchmarkList
import theodolite.k8s.K8sContextFactory
private const val DEFAULT_NAMESPACE = "default"
......@@ -43,7 +44,8 @@ class TheodoliteOperator {
val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL)
val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL)
val controller = TheodoliteController(client = client, executionContext = executionContext)
val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
val controller = TheodoliteController(client = client, executionContext = executionContext, path = appResource)
val informerFactory = client.informers()
val informerExecution = informerFactory.sharedIndexInformerForCustomResource(
......
package theodolite.execution.operator
package theodolite.k8s
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
......
package theodolite.k8s
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
/**
* Fabric8 handles custom resources as plain HashMaps. These need to be handled differently than normal
* Kubernetes resources. The K8sCustomResourceWrapper class provides a wrapper to deploy and delete
* custom resources in a uniform way.
*
* @property map custom resource as plain hashmap
* @constructor Create empty K8s custom resource wrapper
*/
class K8sCustomResourceWrapper(private val map : Map<String,String>) : CustomResource() {
/**
* Deploy a custom resource
*
* @param client a namespaced Kubernetes client which are used to deploy the CR object.
*/
fun deploy(client : NamespacedKubernetesClient){
val kind = this.map["kind"]
// Search the CustomResourceDefinition to which the CR Object belongs.
// This should be exactly one if the CRD is registered for Kubernetes, zero otherwise.
val crds = client.apiextensions().v1beta1().customResourceDefinitions().list()
crds.items
.filter { crd -> crd.toString().contains("kind=$kind") }
.map { crd -> CustomResourceDefinitionContext.fromCrd(crd) }
.forEach { context ->
client.customResource(context).createOrReplace(client.configuration.namespace,this.map as Map<String, Any>)
}
}
/**
* Delete a custom resource
*
* @param client a namespaced Kubernetes client which are used to delete the CR object.
*/
fun delete(client : NamespacedKubernetesClient){
val kind = this.map["kind"]
val metadata = this.map["metadata"] as HashMap<String,String>
val name = metadata["name"]
val crds = client.apiextensions().v1beta1().customResourceDefinitions().list()
crds.items
.filter { crd -> crd.toString().contains("kind=$kind") }
.map { crd -> CustomResourceDefinitionContext.fromCrd(crd) }
.forEach { context ->
client.customResource(context).delete(client.configuration.namespace,name) }
}
}
\ No newline at end of file
......@@ -19,7 +19,7 @@ class K8sManager(private val client: NamespacedKubernetesClient) {
this.client.configMaps().createOrReplace(resource)
is StatefulSet ->
this.client.apps().statefulSets().createOrReplace(resource)
is K8sCustomResourceWrapper -> resource.deploy(client)
is ServiceMonitorWrapper -> resource.deploy(client)
else -> throw IllegalArgumentException("Unknown Kubernetes resource.")
}
}
......@@ -34,7 +34,7 @@ class K8sManager(private val client: NamespacedKubernetesClient) {
this.client.configMaps().delete(resource)
is StatefulSet ->
this.client.apps().statefulSets().delete(resource)
is K8sCustomResourceWrapper -> resource.delete(client)
is ServiceMonitorWrapper -> resource.delete(client)
else -> throw IllegalArgumentException("Unknown Kubernetes resource.")
}
}
......
......@@ -26,8 +26,8 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
* @param path of the yaml file
* @return customResource from fabric8
*/
fun loadCustomResource(path: String): K8sCustomResourceWrapper {
return loadGenericResource(path) { x: String -> K8sCustomResourceWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) }
private fun loadServiceMonitor(path: String): ServiceMonitorWrapper {
return loadGenericResource(path) { x: String -> ServiceMonitorWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) }
}
/**
......@@ -73,16 +73,11 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
return when (kind) {
"Deployment" -> loadDeployment(path)
"Service" -> loadService(path)
"ServiceMonitor" -> loadCustomResource(path)
"ServiceMonitor" -> loadServiceMonitor(path)
"ConfigMap" -> loadConfigmap(path)
else -> {
logger.warn { "Try to load $kind from $path as Custom ressource" }
try{
loadCustomResource(path)
} catch (e:Exception){
logger.error { "Error during loading of unspecified CustomResource: $e" }
throw e
}
logger.error { "Error during loading of unspecified resource Kind" }
throw java.lang.IllegalArgumentException("error while loading resource with kind: $kind")
}
}
}
......
package theodolite.k8s
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
class ServiceMonitorWrapper(private val serviceMonitor: Map<String, String>) : CustomResource() {
/**
* Deploy a service monitor
*
* @param client a namespaced Kubernetes client which are used to deploy the CR object.
*
* @throws java.io.IOException if the resource could not be deployed.
*/
fun deploy(client: NamespacedKubernetesClient) {
val serviceMonitorContext = K8sContextFactory().create(
api = "v1",
scope = "Namespaced",
group = "monitoring.coreos.com",
plural = "servicemonitors"
)
client.customResource(serviceMonitorContext)
.createOrReplace(client.configuration.namespace, this.serviceMonitor as Map<String, Any>)
}
/**
* Delete a service monitor
*
* @param client a namespaced Kubernetes client which are used to delete the CR object.
*/
fun delete(client: NamespacedKubernetesClient) {
val serviceMonitorContext = K8sContextFactory().create(
api = "v1",
scope = "Namespaced",
group = "monitoring.coreos.com",
plural = "servicemonitors"
)
try {
client.customResource(serviceMonitorContext)
.delete(client.configuration.namespace, this.getServiceMonitorName())
} catch (e: Exception) {
logger.warn { "Could not delete service monitor" }
}
}
/**
* @throws NullPointerException if name or metadata is null
*/
private fun getServiceMonitorName(): String {
val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String>
return smAsMap["name"]!!
}
}
......@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......
......@@ -12,14 +12,16 @@ class TestBenchmarkExecutorImpl(
private val mockResults: Array<Array<Boolean>>,
benchmark: Benchmark,
results: Results,
slo: BenchmarkExecution.Slo
slo: BenchmarkExecution.Slo,
executionId: Int
) :
BenchmarkExecutor(
benchmark,
results,
executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(),
slo = slo
slo = slo,
executionId = executionId
) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment