Skip to content
Snippets Groups Projects
Commit d4ead8a5 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/135-introduce-experiment-evaluation' into 'theodolite-kotlin'

Introduce experiment evaluation

See merge request !92
parents 39fbf337 cdc69c62
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!92Introduce experiment evaluation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2334 passed
Showing
with 330 additions and 22 deletions
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY ./app /app
\ No newline at end of file
# Lag Trend SLO Evaluator
## Execution
For development:
```sh
uvicorn main:app --reload
```
Build the docker image:
```sh
docker build . -t theodolite-evaluator
```
Run the Docker image:
```sh
docker run -p 80:80 theodolite-evaluator
```
## Configuration
You can set the `HOST` and the `PORT` (and a lot of more parameters) via environment variables. Default is `0.0.0.0:80`.
For more information see [here](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker#advanced-usage).
from fastapi import FastAPI,Request
import trend_slope_computer as trend_slope_computer
import logging
import os
import pandas as pd
import json
import sys
app = FastAPI()
logging.basicConfig(stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("API")
if os.getenv('LOG_LEVEL') == 'INFO':
logger.setLevel(logging.INFO)
elif os.getenv('LOG_LEVEL') == 'WARNING':
logger.setLevel(logging.WARNING)
elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel((logging.DEBUG))
def execute(results, threshold, warmup):
d = []
for result in results:
group = result['metric']['group']
for value in result['values']:
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
logger.info(df)
try:
trend_slope = trend_slope_computer.compute(df, warmup)
except Exception as e:
err_msg = 'Computing trend slope failed'
logger.exception(err_msg)
logger.error('Mark this subexperiment as not successful and continue benchmark')
return False
logger.info("Trend Slope: %s", trend_slope)
return trend_slope < threshold
@app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request):
data = json.loads(await request.body())
return execute(data['total_lag'], data['threshold'], data['warmup'])
logger.info("Slope evaluator is online")
\ No newline at end of file
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(x, warmup_sec):
input = x
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
return trend_slope
fastapi==0.55.1
scikit-learn==0.20.3
pandas==1.0.3
...@@ -20,12 +20,13 @@ dependencies { ...@@ -20,12 +20,13 @@ dependencies {
implementation 'io.quarkus:quarkus-resteasy' implementation 'io.quarkus:quarkus-resteasy'
testImplementation 'io.quarkus:quarkus-junit5' testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.rest-assured:rest-assured' testImplementation 'io.rest-assured:rest-assured'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'org.slf4j:slf4j-simple:1.7.29' implementation 'org.slf4j:slf4j-simple:1.7.29'
implementation 'io.github.microutils:kotlin-logging:1.12.0' implementation 'io.github.microutils:kotlin-logging:1.12.0'
implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2' implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0' implementation 'org.apache.kafka:kafka-clients:2.7.0'
compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2' implementation 'khttp:khttp:1.0.0'
} }
group 'theodolite' group 'theodolite'
......
...@@ -22,6 +22,10 @@ class BenchmarkExecution { ...@@ -22,6 +22,10 @@ class BenchmarkExecution {
class Slo { class Slo {
lateinit var sloType: String lateinit var sloType: String
var threshold by Delegates.notNull<Int>() var threshold by Delegates.notNull<Int>()
lateinit var prometheusUrl: String
lateinit var externalSloUrl: String
var offset by Delegates.notNull<Int>()
var warmup by Delegates.notNull<Int>()
} }
class LoadDefinition { class LoadDefinition {
......
package theodolite.evaluation
import com.google.gson.Gson
import khttp.post
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
class ExternalSloChecker(
private val prometheusURL: String,
private val query: String,
private val externalSlopeURL: String,
private val threshold: Int,
private val offset: Duration,
private val warmup: Int
) :
SloChecker {
private val RETRIES = 2
private val TIMEOUT = 60.0
override fun evaluate(start: Instant, end: Instant): Boolean {
var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, query)
val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) {
counter++
} else {
return result.text.toBoolean()
}
}
throw ConnectException("Could not reach slope evaluation")
}
}
package theodolite.evaluation
import com.google.gson.Gson
import khttp.get
import khttp.responses.Response
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class MetricFetcher(private val prometheusURL: String, private val offset: Duration) {
private val RETRIES = 2
private val TIMEOUT = 60.0
fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse {
val offsetStart = start.minus(offset)
val offsetEnd = end.minus(offset)
var counter = 0
val parameter = mapOf(
"query" to query,
"start" to offsetStart.toString(),
"end" to offsetEnd.toString(),
"step" to "5s"
)
while (counter < RETRIES) {
val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = TIMEOUT)
if (response.statusCode != 200) {
val message = response.jsonObject.toString()
logger.warn { "Could not connect to Prometheus: $message, retrying now" }
counter++
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values" }
throw NoSuchFieldException()
}
return parseValues(response)
}
}
throw ConnectException("No answer from Prometheus received")
}
private fun parseValues(values: Response): PrometheusResponse {
return Gson().fromJson<PrometheusResponse>(
values.jsonObject.toString(),
PrometheusResponse::class.java
)
}
}
package theodolite.evaluation
interface SLOChecker {}
package theodolite.evaluation
import java.time.Instant
interface SloChecker {
fun evaluate(start: Instant, end: Instant): Boolean
}
package theodolite.evaluation
import java.time.Duration
class SloCheckerFactory {
fun create(
slotype: String,
prometheusURL: String,
query: String,
externalSlopeURL: String,
threshold: Int,
offset: Duration,
warmup: Int
): SloChecker {
return when (slotype) {
"lag trend" -> ExternalSloChecker(
prometheusURL = prometheusURL,
query = query,
externalSlopeURL = externalSlopeURL,
threshold = threshold,
offset = offset,
warmup = warmup
)
else -> throw IllegalArgumentException("Slotype $slotype not found.")
}
}
}
...@@ -2,6 +2,7 @@ package theodolite.execution ...@@ -2,6 +2,7 @@ package theodolite.execution
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
...@@ -22,7 +23,8 @@ abstract class BenchmarkExecutor( ...@@ -22,7 +23,8 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark, val benchmark: Benchmark,
val results: Results, val results: Results,
val executionDuration: Duration, val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride> configurationOverrides: List<ConfigurationOverride>,
val slo: BenchmarkExecution.Slo
) { ) {
/** /**
...@@ -39,9 +41,13 @@ abstract class BenchmarkExecutor( ...@@ -39,9 +41,13 @@ abstract class BenchmarkExecutor(
* *
*/ */
fun waitAndLog() { fun waitAndLog() {
for (i in 1.rangeTo(executionDuration.toMinutes())) { logger.info { "Execution of a new benchmark started." }
Thread.sleep(Duration.ofMinutes(1).toMillis()) for (i in 1.rangeTo(executionDuration.toSeconds())) {
logger.info { "Executed: $i minutes" }
Thread.sleep(Duration.ofSeconds(1).toMillis())
if ((i % 60) == 0L) {
logger.info { "Executed: ${i / 60} minutes" }
}
} }
} }
} }
package theodolite.execution package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.SloCheckerFactory
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.time.Duration import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class BenchmarkExecutorImpl( class BenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
results: Results, results: Results,
executionDuration: Duration, executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride> private val configurationOverrides: List<ConfigurationOverride>,
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides) { slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
//TODO ADD SHUTDOWN HOOK HERE
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
// todo evaluate
val result = false // if success else false var result = false
try {
result = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
return result return result
} }
......
...@@ -20,7 +20,14 @@ class TheodoliteExecutor( ...@@ -20,7 +20,14 @@ class TheodoliteExecutor(
val strategyFactory = StrategyFactory() val strategyFactory = StrategyFactory()
val executionDuration = Duration.ofSeconds(config.execution.duration) val executionDuration = Duration.ofSeconds(config.execution.duration)
val executor = BenchmarkExecutorImpl(kubernetesBenchmark, results, executionDuration, config.configOverrides) val executor =
BenchmarkExecutorImpl(
kubernetesBenchmark,
results,
executionDuration,
config.configOverrides,
config.slos[0]
)
return Config( return Config(
loads = config.load.loadValues.map { load -> LoadDimension(load, config.load.loadType) }, loads = config.load.loadValues.map { load -> LoadDimension(load, config.load.loadType) },
......
...@@ -5,6 +5,8 @@ import mu.KotlinLogging ...@@ -5,6 +5,8 @@ import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.YamlParser import theodolite.util.YamlParser
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@QuarkusMain(name = "TheodoliteYamlExecutor") @QuarkusMain(name = "TheodoliteYamlExecutor")
...@@ -23,5 +25,6 @@ object TheodoliteYamlExecutor { ...@@ -23,5 +25,6 @@ object TheodoliteYamlExecutor {
val executor = TheodoliteExecutor(benchmarkExecution, benchmark) val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run() executor.run()
logger.info { "Theodolite finished" } logger.info { "Theodolite finished" }
exitProcess(0)
} }
} }
...@@ -4,6 +4,7 @@ import mu.KotlinLogging ...@@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import java.util.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
......
package theodolite.util
data class PrometheusResponse(
var status: String? = null,
var data: PromData? = null
)
data class PromData(
var resultType: String? = null,
var result: List<PromResult>? = null
)
data class PromResult(
var metric: PromMetric? = null,
var values: List<Any>? = null
)
data class PromMetric(
var group: String? = null
)
...@@ -3,17 +3,21 @@ benchmark: "benchmarkType" ...@@ -3,17 +3,21 @@ benchmark: "benchmarkType"
load: load:
loadType: "NumSensors" loadType: "NumSensors"
loadValues: loadValues:
- 10000 - 50000
resources: resources:
resourceType: "Instances" resourceType: "Instances"
resourceValues: resourceValues:
- 2 - 1
slos: slos:
- sloType: "slo type" - sloType: "lag trend"
threshold: 1000 threshold: 1000
prometheusUrl: "http://localhost:32656"
externalSloUrl: "http://localhost:80/evaluate-slope"
offset: 0
warmup: 0
execution: execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 duration: 60
repetitions: 1 repetitions: 1
restrictions: restrictions:
- "LowerBound" - "LowerBound"
...@@ -33,7 +37,7 @@ configOverrides: ...@@ -33,7 +37,7 @@ configOverrides:
resource: "uc1-kstreams-deployment.yaml" resource: "uc1-kstreams-deployment.yaml"
container: "uc-application" container: "uc-application"
variableName: "cpu" variableName: "cpu"
value: "50m" value: "1000m"
- patcher: - patcher:
type: "ResourceLimitPatcher" type: "ResourceLimitPatcher"
resource: "uc1-kstreams-deployment.yaml" resource: "uc1-kstreams-deployment.yaml"
......
...@@ -3,6 +3,7 @@ package theodolite ...@@ -3,6 +3,7 @@ package theodolite
import io.quarkus.test.junit.QuarkusTest import io.quarkus.test.junit.QuarkusTest
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import theodolite.benchmark.BenchmarkExecution
import theodolite.strategies.restriction.LowerBoundRestriction import theodolite.strategies.restriction.LowerBoundRestriction
import theodolite.strategies.searchstrategy.BinarySearch import theodolite.strategies.searchstrategy.BinarySearch
import theodolite.strategies.searchstrategy.CompositeStrategy import theodolite.strategies.searchstrategy.CompositeStrategy
...@@ -29,7 +30,8 @@ class CompositeStrategyTest { ...@@ -29,7 +30,8 @@ class CompositeStrategyTest {
val mockResources: List<Resource> = (0..6).map { number -> Resource(number, "Instances") } val mockResources: List<Resource> = (0..6).map { number -> Resource(number, "Instances") }
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results) val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
val linearSearch = LinearSearch(benchmarkExecutor) val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -61,8 +63,9 @@ class CompositeStrategyTest { ...@@ -61,8 +63,9 @@ class CompositeStrategyTest {
val mockResources: List<Resource> = (0..6).map { number -> Resource(number, "Instances") } val mockResources: List<Resource> = (0..6).map { number -> Resource(number, "Instances") }
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl = val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results) TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
val binarySearch = BinarySearch(benchmarkExecutorImpl) val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -94,7 +97,8 @@ class CompositeStrategyTest { ...@@ -94,7 +97,8 @@ class CompositeStrategyTest {
val mockResources: List<Resource> = (0..7).map { number -> Resource(number, "Instances") } val mockResources: List<Resource> = (0..7).map { number -> Resource(number, "Instances") }
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results) val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker)
val binarySearch = BinarySearch(benchmarkExecutor) val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment