diff --git a/MetricFetcher.kt b/MetricFetcher.kt deleted file mode 100644 index cb5feac1ef4d7a6bd30d1cef0f87d1532c6d9d31..0000000000000000000000000000000000000000 --- a/MetricFetcher.kt +++ /dev/null @@ -1,33 +0,0 @@ -package theodolite.evaluation - -import khttp.get -import java.util.* - - -class MetricFetcher(private val prometheusURL: String) { - - fun fetchMetric(start: Long, end: Long, query: String): Any { - - val parameter = mapOf( - "query" to query, - "start" to toISODate(start), - "end" to toISODate(end), - "step" to "5s") - - val response = get("$prometheusURL/api/v1/query_range", params = parameter) - val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString() - return parseValues(values) - return values - } - - private fun toISODate(timestamp: Long): String { - val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") - val date = Date(timestamp) - return sdf.format(date) - } - - private fun parseValues(values: String): Any { - // TODO("pars with gson") - return "" - } -} \ No newline at end of file diff --git a/SLOCheckerImpl.kt b/SLOCheckerImpl.kt deleted file mode 100644 index 7687fc803bd0cba91be6670339150a80c213641b..0000000000000000000000000000000000000000 --- a/SLOCheckerImpl.kt +++ /dev/null @@ -1,17 +0,0 @@ -package theodolite.evaluation - -import khttp.get - -class SLOCheckerImpl(private val prometheusURL: String): SLOChecker { - - override fun evaluate(start: Long, end: Long): Boolean { - val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) - val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)") - print(totalLag) - // val parameter = mapOf("" to "") - // val response = get("", params = parameter) - // TODO("call python evaluator") - - return false - } -} \ No newline at end of file diff --git a/slope-evaluator/api.py b/slope-evaluator/api.py index 927c454fe56eb5b4f2d668f1ecbbe0ded1b35048..404ed22166f0549c01afc078e8b083bb2039eae0 100644 --- a/slope-evaluator/api.py +++ b/slope-evaluator/api.py @@ -4,43 +4,41 @@ import logging import os import pandas as pd import json -import numpy as np -from fastapi.encoders import jsonable_encoder +import sys app = FastAPI() -def execute(results, threshold): +logging.basicConfig(stream=sys.stdout, + format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger("Api") + +logger.setLevel(logging.INFO) +def execute(results, threshold): d = [] for result in results: - #print(results) group = result['metric']['group'] for value in result['values']: - # print(value) d.append({'group': group, 'timestamp': int( value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) df = pd.DataFrame(d) - print(df) + logger.info(df) try: trend_slope = trend_slope_computer.compute(df, 0) except Exception as e: err_msg = 'Computing trend slope failed' - print(err_msg) - logging.exception(err_msg) - print('Mark this subexperiment as not successful and continue benchmark') + logger.exception(err_msg) + logger.error('Mark this subexperiment as not successful and continue benchmark') return False - print(f"Trend Slope: {trend_slope}") + logger.info("Trend Slope: %s", trend_slope) return trend_slope < threshold @app.post("/evaluate-slope",response_model=bool) async def evaluate_slope(request: Request): - print("request received") - x = json.loads(await request.body()) - #x = np.array(x['total_lag']) - y = execute(x['total_lag'], 1000) - print(print(y)) - return y + data = json.loads(await request.body()) + logger.info("Request received") + return execute(data['total_lag'], data['threshold']) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt index 9c80668ab1d4b1716d37cdc4b484ed164ef81ff8..f320ca3cfa93db86a80a4c1a5d5489a9585d4af5 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -6,34 +6,36 @@ import khttp.responses.Response import mu.KotlinLogging import theodolite.util.PrometheusResponse import java.net.ConnectException -import java.util.* +import java.time.Duration +import java.time.Instant private val logger = KotlinLogging.logger {} +class MetricFetcher(private val prometheusURL: String, private val offset: Duration) { -class MetricFetcher(private val prometheusURL: String) { + fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse { + + val offsetStart = start.minus(offset) + val offsetEnd = end.minus(offset) - // Instant - fun fetchMetric(start: Long, end: Long, query: String): PrometheusResponse { - // TODO handle timeouts var trys = 0 val parameter = mapOf( "query" to query, - "start" to toISODate(start), - "end" to toISODate(end), + "start" to offsetStart.toString(), + "end" to offsetEnd.toString(), "step" to "5s" ) while (trys < 2) { - val response = get("$prometheusURL/api/v1/query_range", params = parameter) + val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = 60.0) if (response.statusCode != 200) { val message = response.jsonObject.toString() logger.warn { "Could not connect to Prometheus: $message, retrying now" } trys++ } else { - var values = parseValues(response) + val values = parseValues(response) if (values.data?.result.isNullOrEmpty()) { - logger.warn { "Empty query result: $values" } + logger.error { "Empty query result: $values" } throw NoSuchFieldException() } return parseValues(response) @@ -42,13 +44,6 @@ class MetricFetcher(private val prometheusURL: String) { throw ConnectException("No answer from Prometheus received") } - // TODO required? - private fun toISODate(timestamp: Long): String { - val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'") - val date = Date(timestamp - (3600 * 1000))//subtract 1h since cluster is in another timezone - return sdf.format(date) - } - private fun parseValues(values: Response): PrometheusResponse { return Gson().fromJson<PrometheusResponse>( values.jsonObject.toString(), diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt index 5cab3e2e43a570d0ad388d46525292dbb9e37d4b..8207c1408c4ca177fc291490687efc47878245d8 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt @@ -1,6 +1,7 @@ package theodolite.evaluation -interface SLOChecker { - fun evaluate(start: Long, end: Long): Boolean +import java.time.Instant +interface SLOChecker { + fun evaluate(start: Instant, end: Instant): Boolean } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt index d602a76811dff1df6e6eeecfee9d59a2f1f295e3..60b3cbd41215e1914f56027a255fe902ca9e1cb8 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt @@ -1,20 +1,22 @@ package theodolite.evaluation +import com.google.gson.Gson import khttp.post -import org.json.JSONObject import java.net.ConnectException +import java.time.Duration +import java.time.Instant -class SLOCheckerImpl(private val prometheusURL: String) : SLOChecker { +class SLOCheckerImpl(private val prometheusURL: String, private val threshold: Int, private val offset: Duration) : + SLOChecker { - override fun evaluate(start: Long, end: Long): Boolean { + override fun evaluate(start: Instant, end: Instant): Boolean { var counter = 0 - val metricFetcher = MetricFetcher(prometheusURL = prometheusURL) + val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset) val fetchedData = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag >= 0)") - val data = JSONObject(mapOf("total_lag" to fetchedData.data?.result)) - + val data = Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold)) + while (counter < 2) { - // TODO handle timeouts - val result = post("http://127.0.0.1:8000/evaluate-slope", data = data) + val result = post("http://127.0.0.1:8000/evaluate-slope", data = data, timeout = 60.0) if (result.statusCode != 200) { counter++ } else { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 630bb343acb858e1deace08caef8e4656cce333f..0bf870b2f27837958483b22ebe247d26913ac41a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -1,5 +1,6 @@ package theodolite.execution +import mu.KotlinLogging import theodolite.benchmark.Benchmark import theodolite.evaluation.SLOCheckerImpl import theodolite.util.ConfigurationOverride @@ -9,6 +10,8 @@ import theodolite.util.Results import java.time.Duration import java.time.Instant +private val logger = KotlinLogging.logger {} + class BenchmarkExecutorImpl( benchmark: Benchmark, results: Results, @@ -22,11 +25,18 @@ class BenchmarkExecutorImpl( this.waitAndLog() benchmarkDeployment.teardown() // todo evaluate - val result = SLOCheckerImpl("http://localhost:32656") - .evaluate( //TODO FIX HERE, catch exception -> return false - Instant.now().toEpochMilli() - executionDuration.toMillis(), // TODO instant.minus(duration) - Instant.now().toEpochMilli() - ) + + var result = false + try { + result = SLOCheckerImpl("http://localhost:32656", 100, offset = Duration.ofSeconds(0)) + .evaluate( + Instant.now().minus(executionDuration), + Instant.now() + ) + } catch (e: Exception) { + logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" } + } + this.results.setResult(Pair(load, res), result) return result }