Skip to content
Snippets Groups Projects
Commit d1bef630 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Beginn of the implementation of experiment evaluation

parent 71bd421f
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!96Handle shutdown,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 164 additions and 6 deletions
package theodolite.evaluation
import khttp.get
import java.util.*
class MetricFetcher(private val prometheusURL: String) {
fun fetchMetric(start: Long, end: Long, query: String): Any {
val parameter = mapOf(
"query" to query,
"start" to toISODate(start),
"end" to toISODate(end),
"step" to "5s")
val response = get("$prometheusURL/api/v1/query_range", params = parameter)
val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString()
return parseValues(values)
return values
}
private fun toISODate(timestamp: Long): String {
val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
val date = Date(timestamp)
return sdf.format(date)
}
private fun parseValues(values: String): Any {
// TODO("pars with gson")
return ""
}
}
\ No newline at end of file
package theodolite.evaluation
import khttp.get
class SLOCheckerImpl(private val prometheusURL: String): SLOChecker {
override fun evaluate(start: Long, end: Long): Boolean {
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)")
print(totalLag)
// val parameter = mapOf("" to "")
// val response = get("", params = parameter)
// TODO("call python evaluator")
return false
}
}
\ No newline at end of file
from fastapi import FastAPI
import trend_slope_computer as trend_slope_computer
import logging
import os
import pandas as pd
app = FastAPI()
@app.get("/evaluate-slope")
def evaluate_slope(total_lag):
print("request received")
print(total_lag)
execute(total_lag, 1000)
return {"suitable" : "false"}
def execute(total_lag, threshold):
df = pd.DataFrame(total_lag)
try:
trend_slope = trend_slope_computer.compute(df, 60)
except Exception as e:
err_msg = 'Computing trend slope failed'
print(err_msg)
logging.exception(err_msg)
print('Mark this subexperiment as not successful and continue benchmark')
return False
print(f"Trend Slope: {trend_slope}")
return trend_slope < threshold
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(input, warmup_sec):
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
return trend_slope
...@@ -26,6 +26,7 @@ dependencies { ...@@ -26,6 +26,7 @@ dependencies {
implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2' implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'
compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2' compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2'
compile "khttp:khttp:1.0.0"
} }
group 'theodolite' group 'theodolite'
......
package theodolite.evaluation
import khttp.get
import java.util.*
class MetricFetcher(private val prometheusURL: String) {
fun fetchMetric(start: Long, end: Long, query: String): Any {
val parameter = mapOf(
"query" to query,
"start" to toISODate(start),
"end" to toISODate(end),
"step" to "5s")
val response = get("$prometheusURL/api/v1/query_range", params = parameter)
return response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"]
}
private fun toISODate(timestamp: Long): String {
val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
val date = Date(timestamp - (3600 * 1000))
return sdf.format(date)
}
private fun parseValues(values: String): Any {
// TODO("pars with gson")
return ""
}
}
\ No newline at end of file
package theodolite.evaluation package theodolite.evaluation
interface SLOChecker {} interface SLOChecker {
fun evaluate(start: Long, end: Long): Boolean
}
package theodolite.evaluation
import khttp.get
class SLOCheckerImpl(private val prometheusURL: String): SLOChecker {
override fun evaluate(start: Long, end: Long): Boolean {
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)")
val parameter = mapOf("total_lag" to totalLag.toString())
val response = get("http://127.0.0.1:8000/evaluate-slope", params = parameter)
return response.jsonObject["suitable"] == "true"
}
}
\ No newline at end of file
package theodolite.execution package theodolite.execution
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.evaluation.SLOChecker
import theodolite.evaluation.SLOCheckerImpl
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.sql.Time
import java.time.Duration import java.time.Duration
import java.time.Instant
class BenchmarkExecutorImpl( class BenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
...@@ -19,7 +23,9 @@ class BenchmarkExecutorImpl( ...@@ -19,7 +23,9 @@ class BenchmarkExecutorImpl(
this.waitAndLog() this.waitAndLog()
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
// todo evaluate // todo evaluate
val result = false // if success else false val result = SLOCheckerImpl("http://localhost:32656")
.evaluate(Instant.now().toEpochMilli() - executionDuration.toMillis(),
Instant.now().toEpochMilli())
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
return result return result
} }
......
...@@ -5,6 +5,8 @@ import mu.KotlinLogging ...@@ -5,6 +5,8 @@ import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.YamlParser import theodolite.util.YamlParser
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@QuarkusMain(name = "TheodoliteYamlExecutor") @QuarkusMain(name = "TheodoliteYamlExecutor")
...@@ -23,5 +25,6 @@ object TheodoliteYamlExecutor { ...@@ -23,5 +25,6 @@ object TheodoliteYamlExecutor {
val executor = TheodoliteExecutor(benchmarkExecution, benchmark) val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run() executor.run()
logger.info { "Theodolite finished" } logger.info { "Theodolite finished" }
exitProcess(0)
} }
} }
...@@ -4,6 +4,7 @@ import mu.KotlinLogging ...@@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import java.util.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
......
...@@ -3,17 +3,17 @@ benchmark: "benchmarkType" ...@@ -3,17 +3,17 @@ benchmark: "benchmarkType"
load: load:
loadType: "NumSensors" loadType: "NumSensors"
loadValues: loadValues:
- 10000 - 50000
resources: resources:
resourceType: "Instances" resourceType: "Instances"
resourceValues: resourceValues:
- 2 - 1
slos: slos:
- sloType: "slo type" - sloType: "slo type"
threshold: 1000 threshold: 1000
execution: execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 duration: 60
repetitions: 1 repetitions: 1
restrictions: restrictions:
- "LowerBound" - "LowerBound"
...@@ -33,7 +33,7 @@ configOverrides: ...@@ -33,7 +33,7 @@ configOverrides:
resource: "uc1-kstreams-deployment.yaml" resource: "uc1-kstreams-deployment.yaml"
container: "uc-application" container: "uc-application"
variableName: "cpu" variableName: "cpu"
value: "50m" value: "1000m"
- patcher: - patcher:
type: "ResourceLimitPatcher" type: "ResourceLimitPatcher"
resource: "uc1-kstreams-deployment.yaml" resource: "uc1-kstreams-deployment.yaml"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment