Skip to content
Snippets Groups Projects
Commit 595b2cf2 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Use Instant, Offset, exception on empty query and refactoring of python

parent c278cbb0
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!92Introduce experiment evaluation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
package theodolite.evaluation
import khttp.get
import java.util.*
class MetricFetcher(private val prometheusURL: String) {
fun fetchMetric(start: Long, end: Long, query: String): Any {
val parameter = mapOf(
"query" to query,
"start" to toISODate(start),
"end" to toISODate(end),
"step" to "5s")
val response = get("$prometheusURL/api/v1/query_range", params = parameter)
val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString()
return parseValues(values)
return values
}
private fun toISODate(timestamp: Long): String {
val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
val date = Date(timestamp)
return sdf.format(date)
}
private fun parseValues(values: String): Any {
// TODO("pars with gson")
return ""
}
}
\ No newline at end of file
package theodolite.evaluation
import khttp.get
class SLOCheckerImpl(private val prometheusURL: String): SLOChecker {
override fun evaluate(start: Long, end: Long): Boolean {
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)")
print(totalLag)
// val parameter = mapOf("" to "")
// val response = get("", params = parameter)
// TODO("call python evaluator")
return false
}
}
\ No newline at end of file
......@@ -4,43 +4,41 @@ import logging
import os
import pandas as pd
import json
import numpy as np
from fastapi.encoders import jsonable_encoder
import sys
app = FastAPI()
def execute(results, threshold):
logging.basicConfig(stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("Api")
logger.setLevel(logging.INFO)
def execute(results, threshold):
d = []
for result in results:
#print(results)
group = result['metric']['group']
for value in result['values']:
# print(value)
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
print(df)
logger.info(df)
try:
trend_slope = trend_slope_computer.compute(df, 0)
except Exception as e:
err_msg = 'Computing trend slope failed'
print(err_msg)
logging.exception(err_msg)
print('Mark this subexperiment as not successful and continue benchmark')
logger.exception(err_msg)
logger.error('Mark this subexperiment as not successful and continue benchmark')
return False
print(f"Trend Slope: {trend_slope}")
logger.info("Trend Slope: %s", trend_slope)
return trend_slope < threshold
@app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request):
print("request received")
x = json.loads(await request.body())
#x = np.array(x['total_lag'])
y = execute(x['total_lag'], 1000)
print(print(y))
return y
data = json.loads(await request.body())
logger.info("Request received")
return execute(data['total_lag'], data['threshold'])
......@@ -6,34 +6,36 @@ import khttp.responses.Response
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.util.*
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class MetricFetcher(private val prometheusURL: String, private val offset: Duration) {
class MetricFetcher(private val prometheusURL: String) {
fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse {
val offsetStart = start.minus(offset)
val offsetEnd = end.minus(offset)
// Instant
fun fetchMetric(start: Long, end: Long, query: String): PrometheusResponse {
// TODO handle timeouts
var trys = 0
val parameter = mapOf(
"query" to query,
"start" to toISODate(start),
"end" to toISODate(end),
"start" to offsetStart.toString(),
"end" to offsetEnd.toString(),
"step" to "5s"
)
while (trys < 2) {
val response = get("$prometheusURL/api/v1/query_range", params = parameter)
val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = 60.0)
if (response.statusCode != 200) {
val message = response.jsonObject.toString()
logger.warn { "Could not connect to Prometheus: $message, retrying now" }
trys++
} else {
var values = parseValues(response)
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.warn { "Empty query result: $values" }
logger.error { "Empty query result: $values" }
throw NoSuchFieldException()
}
return parseValues(response)
......@@ -42,13 +44,6 @@ class MetricFetcher(private val prometheusURL: String) {
throw ConnectException("No answer from Prometheus received")
}
// TODO required?
private fun toISODate(timestamp: Long): String {
val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
val date = Date(timestamp - (3600 * 1000))//subtract 1h since cluster is in another timezone
return sdf.format(date)
}
private fun parseValues(values: Response): PrometheusResponse {
return Gson().fromJson<PrometheusResponse>(
values.jsonObject.toString(),
......
package theodolite.evaluation
interface SLOChecker {
fun evaluate(start: Long, end: Long): Boolean
import java.time.Instant
interface SLOChecker {
fun evaluate(start: Instant, end: Instant): Boolean
}
package theodolite.evaluation
import com.google.gson.Gson
import khttp.post
import org.json.JSONObject
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
class SLOCheckerImpl(private val prometheusURL: String) : SLOChecker {
class SLOCheckerImpl(private val prometheusURL: String, private val threshold: Int, private val offset: Duration) :
SLOChecker {
override fun evaluate(start: Long, end: Long): Boolean {
override fun evaluate(start: Instant, end: Instant): Boolean {
var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag >= 0)")
val data = JSONObject(mapOf("total_lag" to fetchedData.data?.result))
val data = Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold))
while (counter < 2) {
// TODO handle timeouts
val result = post("http://127.0.0.1:8000/evaluate-slope", data = data)
val result = post("http://127.0.0.1:8000/evaluate-slope", data = data, timeout = 60.0)
if (result.statusCode != 200) {
counter++
} else {
......
package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.evaluation.SLOCheckerImpl
import theodolite.util.ConfigurationOverride
......@@ -9,6 +10,8 @@ import theodolite.util.Results
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
......@@ -22,11 +25,18 @@ class BenchmarkExecutorImpl(
this.waitAndLog()
benchmarkDeployment.teardown()
// todo evaluate
val result = SLOCheckerImpl("http://localhost:32656")
.evaluate( //TODO FIX HERE, catch exception -> return false
Instant.now().toEpochMilli() - executionDuration.toMillis(), // TODO instant.minus(duration)
Instant.now().toEpochMilli()
)
var result = false
try {
result = SLOCheckerImpl("http://localhost:32656", 100, offset = Duration.ofSeconds(0))
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
this.results.setResult(Pair(load, res), result)
return result
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment