From 4a2d4763cbcc381fccf43e205d399682ea1dd100 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Sat, 20 Mar 2021 13:36:09 +0100
Subject: [PATCH] Reworked Analysis: Added AnalysisExecutor

---
 .../theodolite/evaluation/AnalysisExecutor.kt | 58 +++++++++++++++++++
 .../theodolite/evaluation/CsvExporter.kt      | 14 +++--
 .../evaluation/ExternalSloChecker.kt          |  9 ++-
 .../theodolite/evaluation/MetricFetcher.kt    |  2 +-
 .../theodolite/evaluation/SloChecker.kt       |  3 +-
 .../execution/BenchmarkExecutorImpl.kt        | 21 +------
 6 files changed, 77 insertions(+), 30 deletions(-)
 create mode 100644 theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt

diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt
new file mode 100644
index 000000000..9d061d7ff
--- /dev/null
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt
@@ -0,0 +1,58 @@
+package theodolite.evaluation
+
+import mu.KotlinLogging
+import theodolite.benchmark.BenchmarkExecution
+import theodolite.util.LoadDimension
+import theodolite.util.PrometheusResponse
+import theodolite.util.Resource
+import java.time.Duration
+import java.time.Instant
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Executes the Analysis.
+ */
+class AnalysisExecutor {
+
+    fun analyse(load:LoadDimension,executionDuration: Duration, res: Resource,slo: BenchmarkExecution.Slo): Boolean {
+        var result = false
+
+        try {
+
+            val prometheusData = fetch(Instant.now().minus(executionDuration),
+                Instant.now(),
+                slo,
+                "sum by(group)(kafka_consumergroup_group_lag >= 0)")
+
+            CsvExporter().toCsv("${load.get()}_${res.get()}_${slo.sloType}",prometheusData)
+
+            val sloChecker = SloCheckerFactory().create(
+                slotype = slo.sloType,
+                prometheusURL = slo.prometheusUrl,
+                query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
+                externalSlopeURL = slo.externalSloUrl,
+                threshold = slo.threshold,
+                offset = Duration.ofHours(slo.offset.toLong()),
+                warmup = slo.warmup
+            )
+
+           result =  sloChecker.evaluate(start = Instant.now().minus(executionDuration),
+                end = Instant.now(),fetchedData = prometheusData)
+
+        } catch (e: Exception) {
+            logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
+        }
+
+        return result
+    }
+
+    fun fetch(start: Instant, end: Instant,slo: BenchmarkExecution.Slo,query: String): PrometheusResponse {
+
+        val metricFetcher = MetricFetcher(prometheusURL = slo.prometheusUrl,
+            offset = Duration.ofHours(slo.offset.toLong()))
+        val fetchedData = metricFetcher.fetchMetric(start, end, query)
+
+        return fetchedData
+    }
+}
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt
index 929a7914f..116bfdfd8 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt
@@ -1,31 +1,35 @@
 package theodolite.evaluation
 
+import mu.KotlinLogging
 import theodolite.util.PrometheusResponse
 import java.io.File
 import java.io.PrintWriter
 
+private val logger = KotlinLogging.logger {}
+
 class CsvExporter {
 
     /**
      * Uses the PrintWriter to transform a PrometheusResponse to Csv
      */
     fun toCsv(name : String,prom: PrometheusResponse){
-        val x = toArray(prom)
-        val csvOutputFile: File = File(name+".csv")
+        val responseArray = toArray(prom)
+        val csvOutputFile: File = File("$name.csv")
 
         PrintWriter(csvOutputFile).use { pw ->
             pw.println(listOf("name","time","value").joinToString())
-            x.forEach{
+            responseArray.forEach{
                 pw.println(it.joinToString())
             }
         }
+        logger.debug{csvOutputFile.absolutePath}
+        logger.info { "Wrote csv to $name" }
     }
 
     /**
      * Converts a PrometheusResponse into a List of List of Strings
      */
     private fun toArray(prom : PrometheusResponse): MutableList<List<String>> {
-
         val name = prom.data?.result?.get(0)?.metric?.group.toString()
         val values = prom.data?.result?.get(0)?.values
         val dataList = mutableListOf<List<String>>()
@@ -33,11 +37,9 @@ class CsvExporter {
         if (values != null) {
             for (x in values){
                 val y = x as List<*>
-
                 dataList.add(listOf(name,"${y[0]}","${y[1]}"))
             }
         }
-
         return dataList
     }
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt
index 2de8e2dc9..4cf417bc7 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt
@@ -2,6 +2,8 @@ package theodolite.evaluation
 
 import com.google.gson.Gson
 import khttp.post
+import mu.KotlinLogging
+import theodolite.util.PrometheusResponse
 import java.net.ConnectException
 import java.time.Duration
 import java.time.Instant
@@ -19,10 +21,10 @@ class ExternalSloChecker(
     private val RETRIES = 2
     private val TIMEOUT = 60.0
 
-    override fun evaluate(start: Instant, end: Instant): Boolean {
+    private val logger = KotlinLogging.logger {}
+
+    override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
         var counter = 0
-        val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
-        val fetchedData = metricFetcher.fetchMetric(start, end, query)
         val data =
             Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
 
@@ -30,6 +32,7 @@ class ExternalSloChecker(
             val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
             if (result.statusCode != 200) {
                 counter++
+                logger.error{"Could not reach external slope analysis"}
             } else {
                 return result.text.toBoolean()
             }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
index 7dbaf568c..19a8bbe9b 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt
@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
             } else {
                 val values = parseValues(response)
                 if (values.data?.result.isNullOrEmpty()) {
-                    logger.error { "Empty query result: $values" }
+                    logger.error { "Empty query result: $values between $start and $end for querry $query" }
                     throw NoSuchFieldException()
                 }
                 return parseValues(response)
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt
index 53ed1b7fa..66ea1d201 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt
@@ -1,7 +1,8 @@
 package theodolite.evaluation
 
+import theodolite.util.PrometheusResponse
 import java.time.Instant
 
 interface SloChecker {
-    fun evaluate(start: Instant, end: Instant): Boolean
+    fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
index 458ff108a..0795e9893 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
@@ -3,6 +3,7 @@ package theodolite.execution
 import mu.KotlinLogging
 import theodolite.benchmark.Benchmark
 import theodolite.benchmark.BenchmarkExecution
+import theodolite.evaluation.AnalysisExecutor
 import theodolite.evaluation.SloCheckerFactory
 import theodolite.util.ConfigurationOverride
 import theodolite.util.LoadDimension
@@ -20,30 +21,12 @@ class BenchmarkExecutorImpl(
     private val configurationOverrides: List<ConfigurationOverride>,
     slo: BenchmarkExecution.Slo
 ) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
-    //TODO ADD SHUTDOWN HOOK HERE
     override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
         val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
         benchmarkDeployment.setup()
         this.waitAndLog()
 
-        var result = false
-        try {
-            result = SloCheckerFactory().create(
-                slotype = slo.sloType,
-                prometheusURL = slo.prometheusUrl,
-                query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
-                externalSlopeURL = slo.externalSloUrl,
-                threshold = slo.threshold,
-                offset = Duration.ofHours(slo.offset.toLong()),
-                warmup = slo.warmup
-            )
-                .evaluate(
-                    Instant.now().minus(executionDuration),
-                    Instant.now()
-                )
-        } catch (e: Exception) {
-            logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
-        }
+        var result = AnalysisExecutor().analyse(load,executionDuration,res,slo)
 
         benchmarkDeployment.teardown()
 
-- 
GitLab