Skip to content
Snippets Groups Projects
Commit aa2e6120 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Merge upstream master

parents 1ffbd095 be45d0f5
No related branches found
No related tags found
1 merge request!171Introduce ResourceSets to make loading of resource files more flexible
Showing
with 288 additions and 146 deletions
......@@ -68,5 +68,11 @@ rules:
- get
- create
- update
- apiGroups:
- ""
resources:
- events
verbs:
- create
{{- end }}
{{- end }}
\ No newline at end of file
......@@ -155,7 +155,10 @@ cp-helm-charts:
###
kafka-lag-exporter:
enabled: true
image:
pullPolicy: IfNotPresent
nodeSelector: {}
clusters:
- name: "theodolite-cp-kafka"
bootstrapBrokers: "theodolite-cp-kafka:9092"
......
apiVersion: v1
data:
aggregation-service.yaml: "apiVersion: v1\nkind: Service\nmetadata: \n name: titan-ccp-aggregation\n
\ labels:\n app: titan-ccp-aggregation\nspec:\n #type: NodePort\n selector:
\ \n app: titan-ccp-aggregation\n ports: \n - name: http\n port: 80\n
\ targetPort: 80\n protocol: TCP\n - name: metrics\n port: 5556\n"
jmx-configmap.yaml: |
apiVersion: v1
kind: ConfigMap
metadata:
name: aggregation-jmx-configmap
name: example-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
service-monitor.yaml: |
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app: titan-ccp-aggregation
appScope: titan-ccp
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
endpoints:
- port: metrics
interval: 10s
uc1-kstreams-deployment.yaml: |-
apiVersion: apps/v1
kind: Deployment
......@@ -49,9 +22,6 @@ data:
containers:
- name: uc-application
image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "theodolite-cp-kafka:9092"
......@@ -65,27 +35,6 @@ data:
limits:
memory: 4Gi
cpu: 1000m
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
uc1-load-generator-deployment.yaml: |
apiVersion: apps/v1
kind: Deployment
......@@ -109,10 +58,6 @@ data:
- containerPort: 5701
name: coordination
env:
- name: NUM_SENSORS
value: "25000"
- name: NUM_NESTED_GROUPS
value: "5"
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
......@@ -140,21 +85,3 @@ data:
port: 5701
targetPort: 5701
protocol: TCP
\ No newline at end of file
uc1-service-monitor.yaml: |
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app: titan-ccp-aggregation
appScope: titan-ccp
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
endpoints:
- port: metrics
interval: 10s
kind: ConfigMap
metadata:
name: example-configmap
......@@ -2,6 +2,7 @@ package theodolite.evaluation
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.EvaluationFailedException
import theodolite.util.IOHandler
import theodolite.util.LoadDimension
import theodolite.util.Resource
......@@ -12,7 +13,6 @@ import java.util.*
import java.util.regex.Pattern
private val logger = KotlinLogging.logger {}
private val RECORD_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
/**
* Contains the analysis. Fetches a metric from Prometheus, documents it, and evaluates it.
......@@ -37,7 +37,7 @@ class AnalysisExecutor(
* @return true if the experiment succeeded.
*/
fun analyze(load: LoadDimension, res: Resource, executionIntervals: List<Pair<Instant, Instant>>): Boolean {
var result = false
var result: Boolean
var repetitionCounter = 1
try {
......@@ -50,7 +50,7 @@ class AnalysisExecutor(
fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = RECORD_LAG_QUERY
query = SloConfigHandler.getQueryString(sloType = slo.sloType)
)
}
......@@ -58,7 +58,7 @@ class AnalysisExecutor(
ioHandler.writeToCSVFile(
fileURL = "${fileURL}_${repetitionCounter++}",
data = data.getResultAsList(),
columns = listOf("group", "timestamp", "value")
columns = listOf("labels", "timestamp", "value")
)
}
......@@ -71,8 +71,7 @@ class AnalysisExecutor(
result = sloChecker.evaluate(prometheusData)
} catch (e: Exception) {
// TODO(throw exception in order to make it possible to mark an experiment as unsuccessfully)
logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" }
throw EvaluationFailedException("Evaluation failed for resource '${res.get()}' and load '${load.get()} ", e)
}
return result
}
......
......@@ -36,13 +36,12 @@ class ExternalSloChecker(
*/
override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0
val data = Gson().toJson(
mapOf(
"total_lags" to fetchedData.map { it.data?.result },
"threshold" to threshold,
"warmup" to warmup
)
)
val data = SloJson.Builder()
.results(fetchedData.map { it.data?.result })
.addMetadata("threshold", threshold)
.addMetadata( "warmup", warmup)
.build()
.toJson()
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
......
......@@ -53,8 +53,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values between $start and $end for query $query." }
throw NoSuchFieldException()
throw NoSuchFieldException("Empty query result: $values between $start and $end for query $query.")
}
return parseValues(response)
}
......
......@@ -43,22 +43,23 @@ class SloCheckerFactory {
properties: MutableMap<String, String>,
load: LoadDimension
): SloChecker {
return when (sloType) {
"lag trend" -> ExternalSloChecker(
return when (sloType.toLowerCase()) {
SloTypes.LAG_TREND.value, SloTypes.DROPPED_RECORDS.value -> ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"]
?: throw IllegalArgumentException("externalSloUrl expected"),
threshold = properties["threshold"]?.toInt() ?: throw IllegalArgumentException("threshold expected"),
warmup = properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected")
)
"lag trend ratio" -> {
var thresholdRatio =
SloTypes.LAG_TREND_RATIO.value, SloTypes.DROPPED_RECORDS_RATIO.value -> {
val thresholdRatio =
properties["ratio"]?.toDouble()
?: throw IllegalArgumentException("ratio for threshold expected")
if (thresholdRatio < 0.0) {
throw IllegalArgumentException("Threshold ratio needs to be an Double greater or equal 0.0")
}
// cast to int, as rounding is not really necessary
var threshold = (load.get() * thresholdRatio).toInt()
val threshold = (load.get() * thresholdRatio).toInt()
ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"]
......
package theodolite.evaluation
import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped
private const val CONSUMER_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
@ApplicationScoped
class SloConfigHandler() {
companion object {
fun getQueryString(sloType: String): String {
return when (sloType.toLowerCase()) {
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $sloType")
}
}
}
}
\ No newline at end of file
package theodolite.evaluation
import com.google.gson.Gson
import theodolite.util.PromResult
class SloJson private constructor(
val results: List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
data class Builder(
var results:List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
/**
* Set the results
*
* @param results list of prometheus results
*/
fun results(results: List<List<PromResult>?>) = apply { this.results = results }
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: String) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: Int) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
fun build() = SloJson(
results = results,
metadata = metadata
)
}
fun toJson(): String {
return Gson().toJson(mapOf(
"results" to this.results,
"metadata" to this.metadata
))
}
}
\ No newline at end of file
package theodolite.evaluation
enum class SloTypes(val value: String) {
LAG_TREND("lag trend"),
LAG_TREND_RATIO("lag trend ratio"),
DROPPED_RECORDS("dropped records"),
DROPPED_RECORDS_RATIO("dropped records ratio")
}
\ No newline at end of file
......@@ -25,11 +25,12 @@ abstract class BenchmarkExecutor(
val results: Results,
val executionDuration: Duration,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val slos: List<BenchmarkExecution.Slo>,
val repetitions: Int,
val executionId: Int,
val loadGenerationDelay: Long,
val afterTeardownDelay: Long
val afterTeardownDelay: Long,
val executionName: String
) {
var run: AtomicBoolean = AtomicBoolean(true)
......
......@@ -5,10 +5,8 @@ import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import theodolite.execution.operator.EventCreator
import theodolite.util.*
import java.time.Duration
import java.time.Instant
......@@ -20,29 +18,34 @@ class BenchmarkExecutorImpl(
results: Results,
executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
slos: List<BenchmarkExecution.Slo>,
repetitions: Int,
executionId: Int,
loadGenerationDelay: Long,
afterTeardownDelay: Long
afterTeardownDelay: Long,
executionName: String
) : BenchmarkExecutor(
benchmark,
results,
executionDuration,
configurationOverrides,
slo,
slos,
repetitions,
executionId,
loadGenerationDelay,
afterTeardownDelay
afterTeardownDelay,
executionName
) {
private val eventCreator = EventCreator()
private val mode = Configuration.EXECUTION_MODE
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val executionIntervals: MutableList<Pair<Instant, Instant>> = ArrayList()
for (i in 1.rangeTo(repetitions)) {
logger.info { "Run repetition $i/$repetitions" }
if (this.run.get()) {
logger.info { "Run repetition $i/$repetitions" }
executionIntervals.add(runSingleExperiment(load, res))
} else {
break
......@@ -53,14 +56,23 @@ class BenchmarkExecutorImpl(
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) {
result = AnalysisExecutor(slo = slo, executionId = executionId)
val experimentResults = slos.map {
AnalysisExecutor(slo = it, executionId = executionId)
.analyze(
load = load,
res = res,
executionIntervals = executionIntervals
)
}
result = (false !in experimentResults)
this.results.setResult(Pair(load, res), result)
}
if(!this.run.get()) {
throw ExecutionFailedException("The execution was interrupted")
}
return result
}
......@@ -73,21 +85,48 @@ class BenchmarkExecutorImpl(
this.afterTeardownDelay
)
val from = Instant.now()
// TODO(restructure try catch in order to throw exceptions if there are significant problems by running a experiment)
try {
benchmarkDeployment.setup()
this.waitAndLog()
if (mode == ExecutionModes.OPERATOR.value) {
eventCreator.createEvent(
executionName = executionName,
type = "NORMAL",
reason = "Start experiment",
message = "load: ${load.get()}, resources: ${res.get()}")
}
} catch (e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" }
this.run.set(false)
if (mode == ExecutionModes.OPERATOR.value) {
eventCreator.createEvent(
executionName = executionName,
type = "WARNING",
reason = "Start experiment failed",
message = "load: ${load.get()}, resources: ${res.get()}")
}
throw ExecutionFailedException("Error during setup the experiment", e)
}
val to = Instant.now()
try {
benchmarkDeployment.teardown()
if (mode == ExecutionModes.OPERATOR.value) {
eventCreator.createEvent(
executionName = executionName,
type = "NORMAL",
reason = "Stop experiment",
message = "Teardown complete")
}
} catch (e: Exception) {
logger.warn { "Error while tearing down the benchmark deployment." }
logger.debug { "Teardown failed, caused by: $e" }
if (mode == ExecutionModes.OPERATOR.value) {
eventCreator.createEvent(
executionName = executionName,
type = "WARNING",
reason = "Stop experiment failed",
message = "Teardown failed: ${e.message}")
}
throw ExecutionFailedException("Error during teardown the experiment", e)
}
return Pair(from, to)
}
......
package theodolite.execution
enum class ExecutionModes(val value: String) {
OPERATOR("operator"),
YAML_EXECUTOR("yaml-executor"),
STANDALONE("standalone")
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ package theodolite.execution
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging
import theodolite.execution.operator.TheodoliteOperator
import theodolite.util.Configuration
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {}
......@@ -13,13 +14,12 @@ object Main {
@JvmStatic
fun main(args: Array<String>) {
val mode = System.getenv("MODE") ?: "standalone"
val mode = Configuration.EXECUTION_MODE
logger.info { "Start Theodolite with mode $mode" }
when (mode) {
"standalone" -> TheodoliteStandalone().start()
"yaml-executor" -> TheodoliteStandalone().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start()
when (mode.toLowerCase()) {
ExecutionModes.STANDALONE.value, ExecutionModes.YAML_EXECUTOR.value -> TheodoliteStandalone().start() // TODO remove standalone (#209)
ExecutionModes.OPERATOR.value -> TheodoliteOperator().start()
else -> {
logger.error { "MODE $mode not found" }
exitProcess(1)
......
......@@ -34,16 +34,15 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
afterTeardownDelay = 5L
)
deployment.teardown()
logger.info {
"Finished teardown of all benchmark resources."
}
} catch (e: Exception) {
// TODO(throw exception in order to make it possible to mark an experiment as unsuccessfully)
logger.warn {
"Could not delete all specified resources from Kubernetes. " +
"This could be the case, if not all resources are deployed and running."
}
}
logger.info {
"Finished teardown of all benchmark resources."
}
}
}
......@@ -61,11 +61,12 @@ class TheodoliteExecutor(
results = results,
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
slos = config.slos,
repetitions = config.execution.repetitions,
executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay,
afterTeardownDelay = config.execution.afterTeardownDelay
afterTeardownDelay = config.execution.afterTeardownDelay,
executionName = config.name
)
if (config.load.loadValues != config.load.loadValues.sorted()) {
......@@ -123,16 +124,19 @@ class TheodoliteExecutor(
val config = buildConfig()
// execute benchmarks for each load
try {
for (load in config.loads) {
if (executor.run.get()) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
} finally {
ioHandler.writeToJSONFile(
config.compositeStrategy.benchmarkExecutor.results,
"${resultsFolder}exp${this.config.executionId}-result"
)
}
}
private fun getAndIncrementExecutionID(fileURL: String): Int {
val ioHandler = IOHandler()
......
......@@ -4,6 +4,8 @@ import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.YamlParserFromFile
import theodolite.util.EvaluationFailedException
import theodolite.util.ExecutionFailedException
import kotlin.concurrent.thread
import kotlin.system.exitProcess
......@@ -49,8 +51,14 @@ class TheodoliteStandalone {
val shutdown = thread(start = false) { Shutdown(benchmarkExecution, benchmark).run() }
Runtime.getRuntime().addShutdownHook(shutdown)
val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run()
try {
TheodoliteExecutor(benchmarkExecution, benchmark).run()
} catch (e: EvaluationFailedException) {
logger.error { "Evaluation failed with error: ${e.message}" }
}catch (e: ExecutionFailedException) {
logger.error { "Execution failed with error: ${e.message}" }
}
logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(shutdown)
exitProcess(0)
......
......@@ -54,11 +54,8 @@ class ClusterSetup(
benchmark.spec.name = benchmark.metadata.name
Shutdown(execution.spec, benchmark.spec).start()
} else {
logger.error {
"Execution with state ${States.RUNNING.value} was found, but no corresponding benchmark. " +
"Could not initialize cluster."
}
throw IllegalStateException("Cluster state is invalid, required Benchmark for running execution not found.")
throw IllegalStateException("Execution with state ${States.RUNNING.value} was found, but no corresponding benchmark. " +
"Could not initialize cluster.")
}
}
}
......
package theodolite.execution.operator
import io.fabric8.kubernetes.api.model.EventBuilder
import io.fabric8.kubernetes.api.model.EventSource
import io.fabric8.kubernetes.api.model.ObjectReference
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
import theodolite.util.Configuration
import java.time.Instant
import java.util.*
import kotlin.NoSuchElementException
private val logger = KotlinLogging.logger {}
class EventCreator {
val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(Configuration.NAMESPACE)
fun createEvent(executionName: String, type: String, message: String, reason: String) {
val uuid = UUID.randomUUID().toString()
try {
val objectRef = buildObjectReference(executionName)
val event = EventBuilder()
.withNewMetadata()
.withName(uuid)
.endMetadata()
.withMessage(message)
.withReason(reason)
.withType(type)
.withFirstTimestamp(Instant.now().toString()) // TODO change datetime format
.build()
val source = EventSource()
source.component = Configuration.COMPONENT_NAME
event.source = source
event.involvedObject = objectRef
client.v1().events().inNamespace(Configuration.NAMESPACE).createOrReplace(event)
} catch (e: NoSuchElementException) {
logger.warn {"Could not create event: type: $type, message: $message, reason: $reason, no corresponding execution found."}
}
}
private fun buildObjectReference(executionName: String): ObjectReference {
val exec = TheodoliteOperator()
.getExecutionClient(client = client)
.list()
.items
.first{it.metadata.name == executionName}
val objectRef = ObjectReference()
objectRef.apiVersion = exec.apiVersion
objectRef.kind = exec.kind
objectRef.uid = exec.metadata.uid
objectRef.name = exec.metadata.name
objectRef.namespace = exec.metadata.namespace
objectRef.resourceVersion = exec.metadata.resourceVersion
return objectRef
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment