From c172480bd8cabc9b24156cdaf7ccfdd9bb0fa1b0 Mon Sep 17 00:00:00 2001
From: "stu126940@mail.uni-kiel.de" <stu126940@mail.uni-kiel.de>
Date: Wed, 10 Mar 2021 16:51:34 +0100
Subject: [PATCH] Beginn of the implementation of experiment evaluation

---
 MetricFetcher.kt                              | 33 +++++++++++++++++++
 SLOCheckerImpl.kt                             | 17 ++++++++++
 slope-evaluator/api.py                        | 32 ++++++++++++++++++
 slope-evaluator/trend_slope_computer.py       | 17 ++++++++++
 theodolite-quarkus/build.gradle               |  1 +
 .../theodolite/evaluation/MetricFetcher.kt    | 31 +++++++++++++++++
 .../theodolite/evaluation/SLOChecker.kt       |  5 ++-
 .../theodolite/evaluation/SLOCheckerImpl.kt   | 14 ++++++++
 .../execution/BenchmarkExecutorImpl.kt        |  8 ++++-
 .../execution/TheodoliteYamlExecutor.kt       |  3 ++
 .../kotlin/theodolite/k8s/TopicManager.kt     |  1 +
 .../resources/yaml/BenchmarkExecution.yaml    |  8 ++---
 12 files changed, 164 insertions(+), 6 deletions(-)
 create mode 100644 MetricFetcher.kt
 create mode 100644 SLOCheckerImpl.kt
 create mode 100644 slope-evaluator/api.py
 create mode 100644 slope-evaluator/trend_slope_computer.py
 create mode 100644 theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
 create mode 100644 theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt

diff --git a/MetricFetcher.kt b/MetricFetcher.kt
new file mode 100644
index 000000000..cb5feac1e
--- /dev/null
+++ b/MetricFetcher.kt
@@ -0,0 +1,33 @@
+package theodolite.evaluation
+
+import khttp.get
+import java.util.*
+
+
+class MetricFetcher(private val prometheusURL: String) {
+
+    fun fetchMetric(start: Long, end: Long, query: String): Any {
+
+        val parameter = mapOf(
+            "query" to query,
+            "start" to toISODate(start),
+            "end" to toISODate(end),
+            "step" to "5s")
+
+        val response = get("$prometheusURL/api/v1/query_range", params = parameter)
+        val values = response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"].toString()
+        return parseValues(values)
+        return values
+    }
+
+    private fun toISODate(timestamp: Long): String {
+        val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
+        val date = Date(timestamp)
+        return sdf.format(date)
+    }
+
+    private fun parseValues(values: String): Any {
+        // TODO("pars with gson")
+        return ""
+    }
+}
\ No newline at end of file
diff --git a/SLOCheckerImpl.kt b/SLOCheckerImpl.kt
new file mode 100644
index 000000000..7687fc803
--- /dev/null
+++ b/SLOCheckerImpl.kt
@@ -0,0 +1,17 @@
+package theodolite.evaluation
+
+import khttp.get
+
+class SLOCheckerImpl(private val prometheusURL: String): SLOChecker {
+
+    override fun evaluate(start: Long, end: Long): Boolean {
+        val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
+        val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)")
+        print(totalLag)
+        // val parameter = mapOf("" to "")
+        // val response = get("", params = parameter)
+        // TODO("call python evaluator")
+
+        return false
+    }
+}
\ No newline at end of file
diff --git a/slope-evaluator/api.py b/slope-evaluator/api.py
new file mode 100644
index 000000000..e3fa5b74c
--- /dev/null
+++ b/slope-evaluator/api.py
@@ -0,0 +1,32 @@
+from fastapi import FastAPI
+import trend_slope_computer as trend_slope_computer
+import logging
+import os
+import pandas as pd
+
+app = FastAPI()
+
+
+@app.get("/evaluate-slope")
+def evaluate_slope(total_lag):
+    print("request received")
+    print(total_lag)
+    execute(total_lag, 1000)
+    return {"suitable" : "false"}
+
+
+
+def execute(total_lag, threshold):
+    df = pd.DataFrame(total_lag)
+    try:
+        trend_slope = trend_slope_computer.compute(df, 60)
+    except Exception as e:
+        err_msg = 'Computing trend slope failed'
+        print(err_msg)
+        logging.exception(err_msg)
+        print('Mark this subexperiment as not successful and continue benchmark')
+        return False
+
+    print(f"Trend Slope: {trend_slope}")
+
+    return trend_slope < threshold
diff --git a/slope-evaluator/trend_slope_computer.py b/slope-evaluator/trend_slope_computer.py
new file mode 100644
index 000000000..ea6e7bacd
--- /dev/null
+++ b/slope-evaluator/trend_slope_computer.py
@@ -0,0 +1,17 @@
+from sklearn.linear_model import LinearRegression
+import pandas as pd
+import os
+
+def compute(input, warmup_sec):
+    input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
+    regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
+
+    X = regress.iloc[:, 2].values.reshape(-1, 1)  # values converts it into a numpy array
+    Y = regress.iloc[:, 3].values.reshape(-1, 1)  # -1 means that calculate the dimension of rows, but have 1 column
+    linear_regressor = LinearRegression()  # create object for the class
+    linear_regressor.fit(X, Y)  # perform linear regression
+    Y_pred = linear_regressor.predict(X)  # make predictions
+
+    trend_slope = linear_regressor.coef_[0][0]
+
+    return trend_slope
diff --git a/theodolite-quarkus/build.gradle b/theodolite-quarkus/build.gradle
index ba80ced4b..63d08a53f 100644
--- a/theodolite-quarkus/build.gradle
+++ b/theodolite-quarkus/build.gradle
@@ -26,6 +26,7 @@ dependencies {
     implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2'
     compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'
     compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.6.2'
+    compile "khttp:khttp:1.0.0"
 }
 
 group 'theodolite'
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
new file mode 100644
index 000000000..f64880411
--- /dev/null
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
@@ -0,0 +1,31 @@
+package theodolite.evaluation
+
+import khttp.get
+import java.util.*
+
+
+class MetricFetcher(private val prometheusURL: String) {
+
+    fun fetchMetric(start: Long, end: Long, query: String): Any {
+
+        val parameter = mapOf(
+            "query" to query,
+            "start" to toISODate(start),
+            "end" to toISODate(end),
+            "step" to "5s")
+
+        val response = get("$prometheusURL/api/v1/query_range", params = parameter)
+        return response.jsonObject.getJSONObject("data").getJSONArray("result").getJSONObject(0)["values"]
+    }
+
+    private fun toISODate(timestamp: Long): String {
+        val sdf = java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
+        val date = Date(timestamp - (3600 * 1000))
+        return sdf.format(date)
+    }
+
+    private fun parseValues(values: String): Any {
+        // TODO("pars with gson")
+        return ""
+    }
+}
\ No newline at end of file
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt
index 396e1864b..5cab3e2e4 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOChecker.kt
@@ -1,3 +1,6 @@
 package theodolite.evaluation
 
-interface SLOChecker {}
+interface SLOChecker {
+    fun  evaluate(start: Long, end: Long): Boolean
+
+}
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt
new file mode 100644
index 000000000..594803f06
--- /dev/null
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SLOCheckerImpl.kt
@@ -0,0 +1,14 @@
+package theodolite.evaluation
+
+import khttp.get
+
+class SLOCheckerImpl(private val prometheusURL: String): SLOChecker {
+
+    override fun evaluate(start: Long, end: Long): Boolean {
+        val metricFetcher = MetricFetcher(prometheusURL = prometheusURL)
+        val totalLag = metricFetcher.fetchMetric(start, end, "sum by(group)(kafka_consumergroup_group_lag > 0)")
+        val parameter = mapOf("total_lag" to totalLag.toString())
+        val response = get("http://127.0.0.1:8000/evaluate-slope", params = parameter)
+        return response.jsonObject["suitable"] == "true"
+    }
+}
\ No newline at end of file
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
index 45616a100..99f21294a 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
@@ -1,11 +1,15 @@
 package theodolite.execution
 
 import theodolite.benchmark.Benchmark
+import theodolite.evaluation.SLOChecker
+import theodolite.evaluation.SLOCheckerImpl
 import theodolite.util.ConfigurationOverride
 import theodolite.util.LoadDimension
 import theodolite.util.Resource
 import theodolite.util.Results
+import java.sql.Time
 import java.time.Duration
+import java.time.Instant
 
 class BenchmarkExecutorImpl(
     benchmark: Benchmark,
@@ -19,7 +23,9 @@ class BenchmarkExecutorImpl(
         this.waitAndLog()
         benchmarkDeployment.teardown()
         // todo evaluate
-        val result = false // if success else false
+        val result = SLOCheckerImpl("http://localhost:32656")
+            .evaluate(Instant.now().toEpochMilli() - executionDuration.toMillis(),
+                Instant.now().toEpochMilli())
         this.results.setResult(Pair(load, res), result)
         return result
     }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt
index ecc202542..f6c109dac 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt
@@ -5,6 +5,8 @@ import mu.KotlinLogging
 import theodolite.benchmark.BenchmarkExecution
 import theodolite.benchmark.KubernetesBenchmark
 import theodolite.util.YamlParser
+import kotlin.system.exitProcess
+
 private val logger = KotlinLogging.logger {}
 
 @QuarkusMain(name = "TheodoliteYamlExecutor")
@@ -23,5 +25,6 @@ object TheodoliteYamlExecutor {
         val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
         executor.run()
         logger.info { "Theodolite finished" }
+        exitProcess(0)
     }
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
index 9cdf1c43f..1336f5751 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
@@ -4,6 +4,7 @@ import mu.KotlinLogging
 import org.apache.kafka.clients.admin.AdminClient
 import org.apache.kafka.clients.admin.ListTopicsResult
 import org.apache.kafka.clients.admin.NewTopic
+import java.util.*
 
 private val logger = KotlinLogging.logger {}
 
diff --git a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
index 270daee17..f705b8362 100644
--- a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
+++ b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
@@ -3,17 +3,17 @@ benchmark: "benchmarkType"
 load:
   loadType: "NumSensors"
   loadValues:
-    - 10000
+    - 50000
 resources:
   resourceType: "Instances"
   resourceValues:
-    - 2
+    - 1
 slos:
   - sloType: "slo type"
     threshold: 1000
 execution:
   strategy: "LinearSearch"
-  duration: 300
+  duration: 60
   repetitions: 1
   restrictions:
     - "LowerBound"
@@ -33,7 +33,7 @@ configOverrides:
       resource: "uc1-kstreams-deployment.yaml"
       container: "uc-application"
       variableName: "cpu"
-    value: "50m"
+    value: "1000m"
   - patcher:
       type: "ResourceLimitPatcher"
       resource: "uc1-kstreams-deployment.yaml"
-- 
GitLab