Skip to content
Snippets Groups Projects
Commit a8fe5a8a authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

resolve merge-conflict

parents bc6108a0 fc153c62
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!134Refactoring,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -225,7 +225,17 @@ Theodolite locally on your machine see the description below. ...@@ -225,7 +225,17 @@ Theodolite locally on your machine see the description below.
see the [Configuration](#configuration) section below. Note, that you might uncomment the `serviceAccountName` line if see the [Configuration](#configuration) section below. Note, that you might uncomment the `serviceAccountName` line if
RBAC is enabled on your cluster (see installation of [Theodolite RBAC](#Theodolite-RBAC)). RBAC is enabled on your cluster (see installation of [Theodolite RBAC](#Theodolite-RBAC)).
To start the execution of a benchmark run (with `<your-theodolite-yaml>` being your job definition): To start the execution of a benchmark create a ConfigMap which containts all required Kubernetes resource files for the SUT and the load generator, a ConfigMap for the execution and a ConfigMap for the benchmark.
```sh
kubectl create configmap app-resources-configmap --from-file=<folder-with-all-required-k8s-resources>
kubectl create configmap execution-configmap --from-file=<execution.yaml>
kubectl create configmap benchmark-configmap --from-file=<benchmark.yaml>
```
This will create three ConfigMaps. You can verify this via `kubectl get configmaps`.
Start the Theodolite job (with `<your-theodolite-yaml>` being your job definition):
```sh ```sh
kubectl create -f <your-theodolite-yaml> kubectl create -f <your-theodolite-yaml>
...@@ -241,24 +251,7 @@ Kubernetes volume. ...@@ -241,24 +251,7 @@ Kubernetes volume.
### Configuration ### Configuration
| Command line | Kubernetes | Description | Be sure, that the names of the configmap corresponds correctly to the specifications of the mounted `configmaps`, `volumes`, `mountPath`. In particular: The name of the execution file and the benchmark file must match the value of the corresponding environment variable.
| -------------------- | ------------------- | ------------------------------------------------------------ |
| --uc | UC | **[Mandatory]** Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`. |
| --loads | LOADS | **[Mandatory]** Values for the workload generator to be tested, should be sorted in ascending order. |
| --instances | INSTANCES | **[Mandatory]** Numbers of instances to be benchmarked, should be sorted in ascending order. |
| --duration | DURATION | Duration in minutes subexperiments should be executed for. *Default:* `5`. |
| --partitions | PARTITIONS | Number of partitions for Kafka topics. *Default:* `40`. |
| --cpu-limit | CPU_LIMIT | Kubernetes CPU limit for a single Pod. *Default:* `1000m`. |
| --memory-limit | MEMORY_LIMIT | Kubernetes memory limit for a single Pod. *Default:* `4Gi`. |
| --domain-restriction | DOMAIN_RESTRICTION | A flag that indiciates domain restriction should be used. *Default:* not set. For more details see Section [Domain Restriction](#domain-restriction). |
| --search-strategy | SEARCH_STRATEGY | The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. *Default:* `check-all`. For more details see Section [Benchmarking Search Strategies](#benchmarking-search-strategies). |
| --reset | RESET | Resets the environment before each subexperiment. Useful if execution was aborted and just one experiment should be executed. |
| --reset-only | RESET_ONLY | Only resets the environment. Ignores all other parameters. Useful if execution was aborted and one want a clean state for new executions. |
| --namespace | NAMESPACE | Kubernetes namespace. *Default:* `default`. |
| --prometheus | PROMETHEUS_BASE_URL | Defines where to find the prometheus instance. *Default:* `http://localhost:9090` |
| --path | RESULT_PATH | A directory path for the results. Relative to the Execution folder. *Default:* `results` |
| --configurations | CONFIGURATIONS | Defines environment variables for the use cases and, thus, enables further configuration options. |
| --threshold | THRESHOLD | The threshold for the trend slop that the search strategies use to determine that a load could be handled. *Default:* `2000` |
### Domain Restriction ### Domain Restriction
......
...@@ -5,47 +5,60 @@ metadata: ...@@ -5,47 +5,60 @@ metadata:
spec: spec:
template: template:
spec: spec:
volumes: securityContext:
- name: theodolite-pv-storage runAsUser: 0 # Set the permissions for write access to the volumes.
persistentVolumeClaim:
claimName: theodolite-pv-claim
containers: containers:
- name: lag-analysis
image: ghcr.io/cau-se/theodolite-slo-checker-lag-trend:theodolite-kotlin-latest
ports:
- containerPort: 80
name: analysis
- name: theodolite - name: theodolite
image: ghcr.io/cau-se/theodolite:latest image: ghcr.io/cau-se/theodolite:theodolite-kotlin-latest
# imagePullPolicy: Never # Used to pull "own" local image imagePullPolicy: Always
env: env:
- name: UC # mandatory - name: NAMESPACE
value: "1" valueFrom:
- name: LOADS # mandatory fieldRef:
value: "100000, 200000" fieldPath: metadata.namespace
- name: INSTANCES # mandatory
value: "1, 2, 3" # - name: MODE
# - name: DURATION # value: yaml-executor # Default is `yaml-executor`
# value: "5" - name: THEODOLITE_EXECUTION
# - name: PARTITIONS value: /etc/execution/example-execution-yaml-resource.yaml # The name of this file must correspond to the filename of the execution, from which the config map is created.
# value: "40" - name: THEODOLITE_BENCHMARK
# - name: DOMAIN_RESTRICTION value: /etc/benchmark/example-benchmark-yaml-resource.yaml # The name of this file must correspond to the filename of the benchmark, from which the config map is created.
# value: "True" - name: THEODOLITE_APP_RESOURCES
# - name: SEARCH_STRATEGY value: /etc/app-resources
# value: "linear-search" - name: RESULTS_FOLDER # Folder for saving results
# - name: CPU_LIMIT value: results # Default is the pwd (/deployments)
# value: "1000m" # - name: CREATE_RESULTS_FOLDER # Specify whether the specified result folder should be created if it does not exist.
# - name: MEMORY_LIMIT # value: "false" # Default is false.
# value: "4Gi"
- name: PROMETHEUS_BASE_URL
value: "http://prometheus-operated:9090"
# - name: NAMESPACE
# value: "default"
# - name: CONFIGURATIONS
# value: "COMMIT_INTERVAL_MS=100, NUM_STREAM_THREADS=1"
- name: RESULT_PATH
value: "results"
- name: PYTHONUNBUFFERED # Enable logs in Kubernetes
value: "1"
volumeMounts: volumeMounts:
- mountPath: "/app/results" - mountPath: "/deployments/results" # the mounted path must corresponds to the value of `RESULT_FOLDER`.
name: theodolite-pv-storage name: theodolite-pv-storage
- mountPath: "/etc/app-resources" # must be corresponds to the value of `THEODOLITE_APP_RESOURCES`.
name: app-resources
- mountPath: "/etc/benchmark" # must be corresponds to the value of `THEODOLITE_BENCHMARK`.
name: benchmark
- mountPath: "/etc/execution" # must be corresponds to the value of `THEODOLITE_EXECUTION`.
name: execution
restartPolicy: Never restartPolicy: Never
# Uncomment if RBAC is enabled and configured # Uncomment if RBAC is enabled and configured
# serviceAccountName: theodolite serviceAccountName: theodolite
# Multiple volumes are needed to provide the corresponding files.
# The names must correspond to the created configmaps and the volumeMounts.
volumes:
- name: theodolite-pv-storage
persistentVolumeClaim:
claimName: theodolite-pv-claim
- name: app-resources
configMap:
name: app-resources-configmap
- name: benchmark
configMap:
name: benchmark-configmap
- name: execution
configMap:
name: execution-configmap
backoffLimit: 4 backoffLimit: 4
\ No newline at end of file
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
# #
### ###
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
WORKDIR /work/ WORKDIR /deployments
RUN chown 1001 /work \ RUN chown 1001 /deployments \
&& chmod "g+rwX" /work \ && chmod "g+rwX" /deployments \
&& chown 1001:root /work && chown 1001:root /deployments
COPY --chown=1001:root build/*-runner /work/application COPY --chown=1001:root build/*-runner /deployments/application
COPY config/ /work/config/ COPY config/ /deployments/config/
EXPOSE 8080 EXPOSE 8080
USER 1001 USER 1001
......
...@@ -3,11 +3,14 @@ package theodolite.benchmark ...@@ -3,11 +3,14 @@ package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig import theodolite.util.KafkaConfig
private val logger = KotlinLogging.logger {}
/** /**
* Organizes the deployment of benchmarks in Kubernetes. * Organizes the deployment of benchmarks in Kubernetes.
* *
...@@ -26,7 +29,8 @@ class KubernetesBenchmarkDeployment( ...@@ -26,7 +29,8 @@ class KubernetesBenchmarkDeployment(
) : BenchmarkDeployment { ) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig) private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client) private val kubernetesManager = K8sManager(client)
private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter" private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val SLEEP_AFTER_TEARDOWN = 5000L
/** /**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]: * Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
...@@ -37,9 +41,7 @@ class KubernetesBenchmarkDeployment( ...@@ -37,9 +41,7 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics) kafkaController.createTopics(kafkaTopics)
resources.forEach { resources.forEach { kubernetesManager.deploy(it) }
kubernetesManager.deploy(it)
}
} }
/** /**
...@@ -53,6 +55,8 @@ class KubernetesBenchmarkDeployment( ...@@ -53,6 +55,8 @@ class KubernetesBenchmarkDeployment(
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
Thread.sleep(SLEEP_AFTER_TEARDOWN)
} }
} }
...@@ -44,8 +44,13 @@ class AnalysisExecutor( ...@@ -44,8 +44,13 @@ class AnalysisExecutor(
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)" query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
) )
var resultsFolder: String = System.getenv("RESULTS_FOLDER")
if (resultsFolder.isNotEmpty()){
resultsFolder += "/"
}
CsvExporter().toCsv( CsvExporter().toCsv(
name = "exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}", name = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}",
prom = prometheusData prom = prometheusData
) )
val sloChecker = SloCheckerFactory().create( val sloChecker = SloCheckerFactory().create(
......
...@@ -24,9 +24,9 @@ class CsvExporter { ...@@ -24,9 +24,9 @@ class CsvExporter {
val csvOutputFile = File("$name.csv") val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw -> PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("group", "timestamp", "value").joinToString()) pw.println(listOf("group", "timestamp", "value").joinToString(separator=","))
responseArray.forEach { responseArray.forEach {
pw.println(it.joinToString()) pw.println(it.joinToString(separator=","))
} }
} }
logger.info { "Wrote CSV file: $name to ${csvOutputFile.absolutePath}." } logger.info { "Wrote CSV file: $name to ${csvOutputFile.absolutePath}." }
......
...@@ -5,6 +5,7 @@ import theodolite.benchmark.BenchmarkExecution ...@@ -5,6 +5,7 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
import java.lang.Exception
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -23,6 +24,7 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -23,6 +24,7 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
*/ */
override fun run() { override fun run() {
// Build Configuration to teardown // Build Configuration to teardown
try {
logger.info { "Received shutdown signal -> Shutting down" } logger.info { "Received shutdown signal -> Shutting down" }
val deployment = val deployment =
benchmark.buildDeployment( benchmark.buildDeployment(
...@@ -30,8 +32,13 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -30,8 +32,13 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
res = Resource(0, emptyList()), res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides configurationOverrides = benchmarkExecution.configOverrides
) )
logger.info { "Teardown everything deployed" }
deployment.teardown() deployment.teardown()
} catch (e: Exception) {
logger.warn { "Could not delete all specified resources from Kubernetes. " +
"This could be the case, if not all resources are deployed and running." }
}
logger.info { "Teardown everything deployed" }
logger.info { "Teardown completed" } logger.info { "Teardown completed" }
} }
} }
package theodolite.execution package theodolite.execution
import com.google.gson.GsonBuilder import com.google.gson.GsonBuilder
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.patcher.PatcherDefinitionFactory import theodolite.patcher.PatcherDefinitionFactory
...@@ -10,9 +11,17 @@ import theodolite.util.Config ...@@ -10,9 +11,17 @@ import theodolite.util.Config
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.io.File
import java.io.PrintWriter import java.io.PrintWriter
import java.lang.IllegalArgumentException
import java.lang.Thread.sleep
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration import java.time.Duration
private val logger = KotlinLogging.logger {}
/** /**
* The Theodolite executor runs all the experiments defined with the given execution and benchmark configuration. * The Theodolite executor runs all the experiments defined with the given execution and benchmark configuration.
* *
...@@ -92,13 +101,34 @@ class TheodoliteExecutor( ...@@ -92,13 +101,34 @@ class TheodoliteExecutor(
return this.kubernetesBenchmark return this.kubernetesBenchmark
} }
private fun getResultFolderString(): String {
var resultsFolder: String = System.getenv("RESULTS_FOLDER") ?: ""
val createResultsFolder = System.getenv("CREATE_RESULTS_FOLDER") ?: "false"
if (resultsFolder != ""){
logger.info { "RESULT_FOLDER: $resultsFolder" }
val directory = File(resultsFolder)
if (!directory.exists()) {
logger.error { "Folder $resultsFolder does not exist" }
if (createResultsFolder.toBoolean()) {
directory.mkdirs()
} else {
throw IllegalArgumentException("Result folder not found")
}
}
resultsFolder += "/"
}
return resultsFolder
}
/** /**
* Run all experiments which are specified in the corresponding * Run all experiments which are specified in the corresponding
* execution and benchmark objects. * execution and benchmark objects.
*/ */
fun run() { fun run() {
storeAsFile(this.config, "${this.config.executionId}-execution-configuration") val resultsFolder = getResultFolderString()
storeAsFile(kubernetesBenchmark, "${this.config.executionId}-benchmark-configuration") storeAsFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder/${this.config.executionId}-benchmark-configuration")
val config = buildConfig() val config = buildConfig()
// execute benchmarks for each load // execute benchmarks for each load
...@@ -107,7 +137,7 @@ class TheodoliteExecutor( ...@@ -107,7 +137,7 @@ class TheodoliteExecutor(
config.compositeStrategy.findSuitableResource(load, config.resources) config.compositeStrategy.findSuitableResource(load, config.resources)
} }
} }
storeAsFile(config.compositeStrategy.benchmarkExecutor.results, "${this.config.executionId}-result") storeAsFile(config.compositeStrategy.benchmarkExecutor.results, "$resultsFolder${this.config.executionId}-result")
} }
private fun <T> storeAsFile(saveObject: T, filename: String) { private fun <T> storeAsFile(saveObject: T, filename: String) {
......
...@@ -4,6 +4,7 @@ import mu.KotlinLogging ...@@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.errors.TopicExistsException
import java.lang.Thread.sleep import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
try { try {
result = kafkaAdmin.createTopics(newTopics) result = kafkaAdmin.createTopics(newTopics)
result.all().get() // wait for the future to be completed result.all().get() // wait for the future to be completed
} catch (e: Exception) { // TopicExistsException
} catch (e: Exception) {
logger.warn(e) { "Error during topic creation." } logger.warn(e) { "Error during topic creation." }
logger.debug { e } // TODO remove? logger.debug { e } // TODO remove due to attached exception to warn log?
logger.info { "Remove existing topics." } logger.info { "Remove existing topics." }
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin) delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.info { "Will retry the topic creation in $RETRY_TIME seconds." } logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME) sleep(RETRY_TIME)
retryCreation = true retryCreation = true
} }
...@@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
logger.info { logger.info {
"Topics creation finished with result: ${ "Topics creation finished with result: ${
result.values().map { it.key + ": " + it.value.isDone } result
.values()
.map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
} " } "
} }
...@@ -94,17 +96,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -94,17 +96,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error(e) { "Error while removing topics: $e" } logger.error(e) { "Error while removing topics: $e" }
logger.info { "Existing topics are: ${kafkaAdmin.listTopics()}." } logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." }
} }
val toDelete = topics.filter { topic -> val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) }
kafkaAdmin.listTopics().names().get().contains(topic)
}
if (toDelete.isNullOrEmpty()) { if (toDelete.isNullOrEmpty()) {
deleted = true deleted = true
} else { } else {
logger.info { "Deletion of kafka topics failed, will retry in $RETRY_TIME seconds." } logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME) sleep(RETRY_TIME)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment