diff --git a/MetricFetcher.kt b/MetricFetcher.kt new file mode 100644 index 0000000000000000000000000000000000000000..cb5feac1ef4d7a6bd30d1cef0f87d1532c6d9d31 --- /dev/null +++ b/MetricFetcher.kt @@ -0,0 +1,33 @@ +package theodolite.evaluation + +import khttp.get +import java.util.* + + +class MetricFetcher(private val prometheusURL: String) { + + fun fetchMetric(start: Long, end: Long, query: String): Any { + + val parameter = mapOf( + "query" to query, + "start" to toISODate(start), + "end" to toISODate(end), + "step" to "5s") + + val response = get("$prometheusURL/api/v1/query_range", params = parameter) + val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString() + return parseValues(values) + return values + } + + private fun toISODate(timestamp: Long): String { + val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") + val date = Date(timestamp) + return sdf.format(date) + } + + private fun parseValues(values: String): Any { + // TODO("pars with gson") + return "" + } +} \ No newline at end of file diff --git a/SLOCheckerImpl.kt b/SLOCheckerImpl.kt new file mode 100644 index 0000000000000000000000000000000000000000..7687fc803bd0cba91be6670339150a80c213641b --- /dev/null +++ b/SLOCheckerImpl.kt @@ -0,0 +1,17 @@ +package theodolite.evaluation + +import khttp.get + +class SLOCheckerImpl(private val prometheusURL: String): SLOChecker { + + override fun evaluate(start: Long, end: Long): Boolean { + val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) + val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)") + print(totalLag) + // val parameter = mapOf("" to "") + // val response = get("", params = parameter) + // TODO("call python evaluator") + + return false + } +} \ No newline at end of file diff --git a/slope-evaluator/api.py b/slope-evaluator/api.py new file mode 100644 index 0000000000000000000000000000000000000000..e3fa5b74c31ff43451871ceda4c10da206d423d0 --- /dev/null +++ b/slope-evaluator/api.py @@ -0,0 +1,32 @@ +from fastapi import FastAPI +import trend_slope_computer as trend_slope_computer +import logging +import os +import pandas as pd + +app = FastAPI() + + +@app.get("/evaluate-slope") +def evaluate_slope(total_lag): + print("request received") + print(total_lag) + execute(total_lag, 1000) + return {"suitable" : "false"} + + + +def execute(total_lag, threshold): + df = pd.DataFrame(total_lag) + try: + trend_slope = trend_slope_computer.compute(df, 60) + except Exception as e: + err_msg = 'Computing trend slope failed' + print(err_msg) + logging.exception(err_msg) + print('Mark this subexperiment as not successful and continue benchmark') + return False + + print(f"Trend Slope: {trend_slope}") + + return trend_slope < threshold diff --git a/slope-evaluator/trend_slope_computer.py b/slope-evaluator/trend_slope_computer.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6e7bacdfdc176d8049a41c7090093f9b5d852f --- /dev/null +++ b/slope-evaluator/trend_slope_computer.py @@ -0,0 +1,17 @@ +from sklearn.linear_model import LinearRegression +import pandas as pd +import os + +def compute(input, warmup_sec): + input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] + regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up + + X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array + Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column + linear_regressor = LinearRegression() # create object for the class + linear_regressor.fit(X, Y) # perform linear regression + Y_pred = linear_regressor.predict(X) # make predictions + + trend_slope = linear_regressor.coef_[0][0] + + return trend_slope diff --git a/theodolite-quarkus/build.gradle b/theodolite-quarkus/build.gradle index ba80ced4b1e4e58f34d5b316f4a46f4e032654a9..63d08a53f93fe9f275c1e5de01df47ec9dc2a27a 100644 --- a/theodolite-quarkus/build.gradle +++ b/theodolite-quarkus/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0' compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2' + compile "khttp:khttp:1.0.0" } group 'theodolite' diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt new file mode 100644 index 0000000000000000000000000000000000000000..f6488041158a233371c4565e1ad66e0a40d6f2ff --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -0,0 +1,31 @@ +package theodolite.evaluation + +import khttp.get +import java.util.* + + +class MetricFetcher(private val prometheusURL: String) { + + fun fetchMetric(start: Long, end: Long, query: String): Any { + + val parameter = mapOf( + "query" to query, + "start" to toISODate(start), + "end" to toISODate(end), + "step" to "5s") + + val response = get("$prometheusURL/api/v1/query_range", params = parameter) + return response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"] + } + + private fun toISODate(timestamp: Long): String { + val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") + val date = Date(timestamp - (3600 * 1000)) + return sdf.format(date) + } + + private fun parseValues(values: String): Any { + // TODO("pars with gson") + return "" + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt index 396e1864b491e15d44881439c10847a39ea18286..5cab3e2e43a570d0ad388d46525292dbb9e37d4b 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt @@ -1,3 +1,6 @@ package theodolite.evaluation -interface SLOChecker {} +interface SLOChecker { + fun evaluate(start: Long, end: Long): Boolean + +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt new file mode 100644 index 0000000000000000000000000000000000000000..594803f06d35f1e0706439aae0af3843d12ac5d0 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt @@ -0,0 +1,14 @@ +package theodolite.evaluation + +import khttp.get + +class SLOCheckerImpl(private val prometheusURL: String): SLOChecker { + + override fun evaluate(start: Long, end: Long): Boolean { + val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) + val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)") + val parameter = mapOf("total_lag" to totalLag.toString()) + val response = get("http://127.0.0.1:8000/evaluate-slope", params = parameter) + return response.jsonObject["suitable"] == "true" + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 45616a100a8ed067237413ac5afa9fd32f4865e1..99f21294a3d63144734f1afec1c9be1c6bfb63da 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -1,11 +1,15 @@ package theodolite.execution import theodolite.benchmark.Benchmark +import theodolite.evaluation.SLOChecker +import theodolite.evaluation.SLOCheckerImpl import theodolite.util.ConfigurationOverride import theodolite.util.LoadDimension import theodolite.util.Resource import theodolite.util.Results +import java.sql.Time import java.time.Duration +import java.time.Instant class BenchmarkExecutorImpl( benchmark: Benchmark, @@ -19,7 +23,9 @@ class BenchmarkExecutorImpl( this.waitAndLog() benchmarkDeployment.teardown() // todo evaluate - val result = false // if success else false + val result = SLOCheckerImpl("http://localhost:32656") + .evaluate(Instant.now().toEpochMilli() - executionDuration.toMillis(), + Instant.now().toEpochMilli()) this.results.setResult(Pair(load, res), result) return result } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt index ecc202542a0472808f70f9c5dd9696e2de370ea1..f6c109dac52ea4a4faeb30b8041329bc86812552 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt @@ -5,6 +5,8 @@ import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark import theodolite.util.YamlParser +import kotlin.system.exitProcess + private val logger = KotlinLogging.logger {} @QuarkusMain(name = "TheodoliteYamlExecutor") @@ -23,5 +25,6 @@ object TheodoliteYamlExecutor { val executor = TheodoliteExecutor(benchmarkExecution, benchmark) executor.run() logger.info { "Theodolite finished" } + exitProcess(0) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 9cdf1c43fae9a72bd78126d420522b2d41a399ee..1336f57517ef74d8c781cc3b51bf130dbf8d99c5 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.clients.admin.NewTopic +import java.util.* private val logger = KotlinLogging.logger {} diff --git a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml index 270daee1708ca4791c65ff9f4a9e1a1e7e78c4d3..f705b8362448cad7de99080143d31315de8af524 100644 --- a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml +++ b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml @@ -3,17 +3,17 @@ benchmark: "benchmarkType" load: loadType: "NumSensors" loadValues: - - 10000 + - 50000 resources: resourceType: "Instances" resourceValues: - - 2 + - 1 slos: - sloType: "slo type" threshold: 1000 execution: strategy: "LinearSearch" - duration: 300 + duration: 60 repetitions: 1 restrictions: - "LowerBound" @@ -33,7 +33,7 @@ configOverrides: resource: "uc1-kstreams-deployment.yaml" container: "uc-application" variableName: "cpu" - value: "50m" + value: "1000m" - patcher: type: "ResourceLimitPatcher" resource: "uc1-kstreams-deployment.yaml"