From d1bef630d830d2dadc4ac06eb3036bf9371f1424 Mon Sep 17 00:00:00 2001 From: "stu126940@mail.uni-kiel.de" <stu126940@mail.uni-kiel.de> Date: Wed, 10 Mar 2021 16:51:34 +0100 Subject: [PATCH] Beginn of the implementation of experiment evaluation --- MetricFetcher.kt | 33 +++++++++++++++++++ SLOCheckerImpl.kt | 17 ++++++++++ slope-evaluator/api.py | 32 ++++++++++++++++++ slope-evaluator/trend_slope_computer.py | 17 ++++++++++ theodolite-quarkus/build.gradle | 1 + .../theodolite/evaluation/MetricFetcher.kt | 31 +++++++++++++++++ .../theodolite/evaluation/SLOChecker.kt | 5 ++- .../theodolite/evaluation/SLOCheckerImpl.kt | 14 ++++++++ .../execution/BenchmarkExecutorImpl.kt | 8 ++++- .../execution/TheodoliteYamlExecutor.kt | 3 ++ .../kotlin/theodolite/k8s/TopicManager.kt | 1 + .../resources/yaml/BenchmarkExecution.yaml | 8 ++--- 12 files changed, 164 insertions(+), 6 deletions(-) create mode 100644 MetricFetcher.kt create mode 100644 SLOCheckerImpl.kt create mode 100644 slope-evaluator/api.py create mode 100644 slope-evaluator/trend_slope_computer.py create mode 100644 theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt create mode 100644 theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt diff --git a/MetricFetcher.kt b/MetricFetcher.kt new file mode 100644 index 000000000..cb5feac1e --- /dev/null +++ b/MetricFetcher.kt @@ -0,0 +1,33 @@ +package theodolite.evaluation + +import khttp.get +import java.util.* + + +class MetricFetcher(private val prometheusURL: String) { + + fun fetchMetric(start: Long, end: Long, query: String): Any { + + val parameter = mapOf( + "query" to query, + "start" to toISODate(start), + "end" to toISODate(end), + "step" to "5s") + + val response = get("$prometheusURL/api/v1/query_range", params = parameter) + val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString() + return parseValues(values) + return values + } + + private fun toISODate(timestamp: Long): String { + val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") + val date = Date(timestamp) + return sdf.format(date) + } + + private fun parseValues(values: String): Any { + // TODO("pars with gson") + return "" + } +} \ No newline at end of file diff --git a/SLOCheckerImpl.kt b/SLOCheckerImpl.kt new file mode 100644 index 000000000..7687fc803 --- /dev/null +++ b/SLOCheckerImpl.kt @@ -0,0 +1,17 @@ +package theodolite.evaluation + +import khttp.get + +class SLOCheckerImpl(private val prometheusURL: String): SLOChecker { + + override fun evaluate(start: Long, end: Long): Boolean { + val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) + val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)") + print(totalLag) + // val parameter = mapOf("" to "") + // val response = get("", params = parameter) + // TODO("call python evaluator") + + return false + } +} \ No newline at end of file diff --git a/slope-evaluator/api.py b/slope-evaluator/api.py new file mode 100644 index 000000000..e3fa5b74c --- /dev/null +++ b/slope-evaluator/api.py @@ -0,0 +1,32 @@ +from fastapi import FastAPI +import trend_slope_computer as trend_slope_computer +import logging +import os +import pandas as pd + +app = FastAPI() + + +@app.get("/evaluate-slope") +def evaluate_slope(total_lag): + print("request received") + print(total_lag) + execute(total_lag, 1000) + return {"suitable" : "false"} + + + +def execute(total_lag, threshold): + df = pd.DataFrame(total_lag) + try: + trend_slope = trend_slope_computer.compute(df, 60) + except Exception as e: + err_msg = 'Computing trend slope failed' + print(err_msg) + logging.exception(err_msg) + print('Mark this subexperiment as not successful and continue benchmark') + return False + + print(f"Trend Slope: {trend_slope}") + + return trend_slope < threshold diff --git a/slope-evaluator/trend_slope_computer.py b/slope-evaluator/trend_slope_computer.py new file mode 100644 index 000000000..ea6e7bacd --- /dev/null +++ b/slope-evaluator/trend_slope_computer.py @@ -0,0 +1,17 @@ +from sklearn.linear_model import LinearRegression +import pandas as pd +import os + +def compute(input, warmup_sec): + input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] + regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up + + X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array + Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column + linear_regressor = LinearRegression() # create object for the class + linear_regressor.fit(X, Y) # perform linear regression + Y_pred = linear_regressor.predict(X) # make predictions + + trend_slope = linear_regressor.coef_[0][0] + + return trend_slope diff --git a/theodolite-quarkus/build.gradle b/theodolite-quarkus/build.gradle index ba80ced4b..63d08a53f 100644 --- a/theodolite-quarkus/build.gradle +++ b/theodolite-quarkus/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0' compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2' + compile "khttp:khttp:1.0.0" } group 'theodolite' diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt new file mode 100644 index 000000000..f64880411 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -0,0 +1,31 @@ +package theodolite.evaluation + +import khttp.get +import java.util.* + + +class MetricFetcher(private val prometheusURL: String) { + + fun fetchMetric(start: Long, end: Long, query: String): Any { + + val parameter = mapOf( + "query" to query, + "start" to toISODate(start), + "end" to toISODate(end), + "step" to "5s") + + val response = get("$prometheusURL/api/v1/query_range", params = parameter) + return response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"] + } + + private fun toISODate(timestamp: Long): String { + val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") + val date = Date(timestamp - (3600 * 1000)) + return sdf.format(date) + } + + private fun parseValues(values: String): Any { + // TODO("pars with gson") + return "" + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt index 396e1864b..5cab3e2e4 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt @@ -1,3 +1,6 @@ package theodolite.evaluation -interface SLOChecker {} +interface SLOChecker { + fun evaluate(start: Long, end: Long): Boolean + +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt new file mode 100644 index 000000000..594803f06 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt @@ -0,0 +1,14 @@ +package theodolite.evaluation + +import khttp.get + +class SLOCheckerImpl(private val prometheusURL: String): SLOChecker { + + override fun evaluate(start: Long, end: Long): Boolean { + val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) + val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)") + val parameter = mapOf("total_lag" to totalLag.toString()) + val response = get("http://127.0.0.1:8000/evaluate-slope", params = parameter) + return response.jsonObject["suitable"] == "true" + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 45616a100..99f21294a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -1,11 +1,15 @@ package theodolite.execution import theodolite.benchmark.Benchmark +import theodolite.evaluation.SLOChecker +import theodolite.evaluation.SLOCheckerImpl import theodolite.util.ConfigurationOverride import theodolite.util.LoadDimension import theodolite.util.Resource import theodolite.util.Results +import java.sql.Time import java.time.Duration +import java.time.Instant class BenchmarkExecutorImpl( benchmark: Benchmark, @@ -19,7 +23,9 @@ class BenchmarkExecutorImpl( this.waitAndLog() benchmarkDeployment.teardown() // todo evaluate - val result = false // if success else false + val result = SLOCheckerImpl("http://localhost:32656") + .evaluate(Instant.now().toEpochMilli() - executionDuration.toMillis(), + Instant.now().toEpochMilli()) this.results.setResult(Pair(load, res), result) return result } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt index ecc202542..f6c109dac 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt @@ -5,6 +5,8 @@ import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark import theodolite.util.YamlParser +import kotlin.system.exitProcess + private val logger = KotlinLogging.logger {} @QuarkusMain(name = "TheodoliteYamlExecutor") @@ -23,5 +25,6 @@ object TheodoliteYamlExecutor { val executor = TheodoliteExecutor(benchmarkExecution, benchmark) executor.run() logger.info { "Theodolite finished" } + exitProcess(0) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 9cdf1c43f..1336f5751 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.clients.admin.NewTopic +import java.util.* private val logger = KotlinLogging.logger {} diff --git a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml index 270daee17..f705b8362 100644 --- a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml +++ b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml @@ -3,17 +3,17 @@ benchmark: "benchmarkType" load: loadType: "NumSensors" loadValues: - - 10000 + - 50000 resources: resourceType: "Instances" resourceValues: - - 2 + - 1 slos: - sloType: "slo type" threshold: 1000 execution: strategy: "LinearSearch" - duration: 300 + duration: 60 repetitions: 1 restrictions: - "LowerBound" @@ -33,7 +33,7 @@ configOverrides: resource: "uc1-kstreams-deployment.yaml" container: "uc-application" variableName: "cpu" - value: "50m" + value: "1000m" - patcher: type: "ResourceLimitPatcher" resource: "uc1-kstreams-deployment.yaml" -- GitLab