From fa0af504620ffb402d1494b0729f4adb0bc8083b Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Tue, 2 Feb 2021 21:08:31 +0100
Subject: [PATCH] Fix creatOrReplace and workloadGenrator deletion, but not
 kafka

---
 execution/infrastructure/kafka/values.yaml    |  6 +--
 .../infrastructure/kafka/values_new.yaml      | 15 ++++++++
 .../execution/TheodoliteExecutor.kt           | 38 ++++++++++++-------
 .../kotlin/theodolite/k8s/ConfigMapManager.kt |  2 +-
 .../theodolite/k8s/DeploymentManager.kt       |  2 +-
 .../kotlin/theodolite/k8s/ServiceManager.kt   |  2 +-
 .../kotlin/theodolite/k8s/UC1Benchmark.kt     | 24 +++++++++---
 .../kotlin/theodolite/util/TestBenchmark.kt   | 26 ++++++-------
 8 files changed, 76 insertions(+), 39 deletions(-)
 create mode 100644 execution/infrastructure/kafka/values_new.yaml

diff --git a/execution/infrastructure/kafka/values.yaml b/execution/infrastructure/kafka/values.yaml
index e65a5fc56..d20b04918 100644
--- a/execution/infrastructure/kafka/values.yaml
+++ b/execution/infrastructure/kafka/values.yaml
@@ -3,7 +3,7 @@
 ## ------------------------------------------------------
 cp-zookeeper:
   enabled: true
-  servers: 3
+  servers: 1
   image: confluentinc/cp-zookeeper
   imageTag: 5.4.0
   ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
@@ -28,7 +28,7 @@ cp-zookeeper:
 ## ------------------------------------------------------
 cp-kafka:
   enabled: true
-  brokers: 10
+  brokers: 1
   image: confluentinc/cp-enterprise-kafka
   imageTag: 5.4.0
   ## Optionally specify an array of imagePullSecrets. Secrets must be manually created in the namespace.
@@ -48,7 +48,7 @@ cp-kafka:
   #   cpu: 100m
   #   memory: 128Mi
   configurationOverrides:
-    #"offsets.topic.replication.factor": "3"
+    offsets.topic.replication.factor: "1"
     "message.max.bytes": "134217728" # 128 MB
     "replica.fetch.max.bytes": "134217728" # 128 MB
     # "default.replication.factor": 3
diff --git a/execution/infrastructure/kafka/values_new.yaml b/execution/infrastructure/kafka/values_new.yaml
new file mode 100644
index 000000000..41b111f32
--- /dev/null
+++ b/execution/infrastructure/kafka/values_new.yaml
@@ -0,0 +1,15 @@
+cp-helm-charts:
+    ## ------------------------------------------------------
+    ## Zookeeper
+    ## ------------------------------------------------------
+    cp-zookeeper:
+      servers: 1 # default: 3 
+
+  ## ------------------------------------------------------
+  ## Kafka
+  ## ------------------------------------------------------
+    cp-kafka:
+        brokers: 1 # deauflt: 10
+
+        configurationOverrides:
+          offsets.topic.replication.factor: "3"
\ No newline at end of file
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
index 86f3baf84..73a527bcb 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
@@ -4,37 +4,47 @@ import theodolite.k8s.UC1Benchmark
 import theodolite.strategies.restriction.LowerBoundRestriction
 import theodolite.strategies.searchstrategy.CompositeStrategy
 import theodolite.strategies.searchstrategy.LinearSearch
-import theodolite.util.*
+import theodolite.util.Config
+import theodolite.util.LoadDimension
+import theodolite.util.Resource
+import theodolite.util.Results
 import java.time.Duration
 
 class TheodoliteExecutor() {
+    val path = "/home/lorenz/git/spesb/theodolite-quarkus/src/main/resources/yaml"
     private fun loadConfig(): Config {
         val benchmark: UC1Benchmark = UC1Benchmark(
-            UC1Benchmark.UC1BenchmarkConfig(
-                zookeeperConnectionString = "my-confluent-cp-zookeeper:2181",
-                kafkaIPConnectionString = "my-confluent-cp-kafka:9092",
+            UC1Benchmark.UC1BenchmarkConfig(    // use port forward 2181 -> 2181
+                zookeeperConnectionString = "127.0.0.1:2181", //"my-confluent-cp-zookeeper:2181", //localhost:2181.
+                kafkaIPConnectionString = "localhost:9093",//"my-confluent-cp-kafka:","178.18.0."
                 schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081",
                 kafkaPartition = 40,
-                kafkaReplication = 3,
+                kafkaReplication = 1,
                 kafkaTopics = listOf("input", "output"),
                 // TODO("handle path in a more nice way (not absolut)")
-                ucDeploymentPath = "src/main/resources/yaml/aggregation-deployment.yaml",
-                ucServicePath = "src/main/resources/yaml/aggregation-service.yaml",
-                wgDeploymentPath = "src/main/resources/yaml/workloadGenerator.yaml",
+                ucDeploymentPath = path + "/aggregation-deployment.yaml",
+                ucServicePath = path + "/aggregation-service.yaml",
+                wgDeploymentPath = path + "/workloadGenerator.yaml",
                 ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest",
                 wgImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-workload-generator:latest"
-        ))
+            )
+        )
         val results: Results = Results()
-        val executionDuration = Duration.ofSeconds(60*5 )
+        val executionDuration = Duration.ofSeconds(60 * 5)
         val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration)
 
         val restrictionStrategy = LowerBoundRestriction(results)
         val searchStrategy = LinearSearch(executor, results)
 
         return Config(
-            loads = (0..6).map{ number -> LoadDimension(number) },
-            resources = (0..6).map{ number -> Resource(number) },
-            compositeStrategy = CompositeStrategy(executor, searchStrategy, restrictionStrategies = setOf(restrictionStrategy), results = results),
+            loads = (0..6).map { number -> LoadDimension(number) },
+            resources = (0..6).map { number -> Resource(number) },
+            compositeStrategy = CompositeStrategy(
+                executor,
+                searchStrategy,
+                restrictionStrategies = setOf(restrictionStrategy),
+                results = results
+            ),
             executionDuration = executionDuration
         )
     }
@@ -44,7 +54,7 @@ class TheodoliteExecutor() {
         val config = this.loadConfig()
 
         // execute benchmarks for each load
-        for(load in config.loads) {
+        for (load in config.loads) {
             config.compositeStrategy.findSuitableResources(load, config.resources)
         }
 
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt
index fe11085c6..7fbf604c4 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ConfigMapManager.kt
@@ -11,7 +11,7 @@ class ConfigMapManager(client: NamespacedKubernetesClient) {
     }
 
     fun deploy(configMap: ConfigMap) {
-        client.configMaps().create(configMap)
+        client.configMaps().createOrReplace(configMap)
     }
 
     fun delete(configMap: ConfigMap) {
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt
index f885b3bf2..9f6f3ab7a 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/DeploymentManager.kt
@@ -75,7 +75,7 @@ class DeploymentManager(client: NamespacedKubernetesClient) {
 
     // TODO potential add exception handling
     fun deploy(deployment: Deployment) {
-        client.apps().deployments().create(deployment)
+        client.apps().deployments().createOrReplace(deployment)
     }
 
     // TODO potential add exception handling
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt
index 16e5bbd64..ed262d57a 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/ServiceManager.kt
@@ -18,7 +18,7 @@ class ServiceManager(client: NamespacedKubernetesClient) {
     }
 
     fun deploy(service: Service) {
-        client.services().create(service)
+        client.services().createOrReplace(service)
     }
 
     fun delete(service: Service) {
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt
index 128fefd2a..42e7d2b67 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/UC1Benchmark.kt
@@ -11,6 +11,7 @@ import theodolite.util.Resource
 class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
     val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner
     val topicManager: TopicManager
+
     // TODO("service monitor")
     val kubernetesClient: NamespacedKubernetesClient
     val yamlLoader: YamlLoader
@@ -23,7 +24,7 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
     init {
         this.workloadGeneratorStateCleaner = WorkloadGeneratorStateCleaner(this.config.zookeeperConnectionString)
         this.topicManager = TopicManager(this.config.kafkaIPConnectionString)
-        this.kubernetesClient = DefaultKubernetesClient()
+        this.kubernetesClient = DefaultKubernetesClient().inNamespace("default")
         this.yamlLoader = YamlLoader(this.kubernetesClient)
         this.deploymentManager = DeploymentManager(this.kubernetesClient)
         this.serviceManager = ServiceManager(this.kubernetesClient)
@@ -39,9 +40,15 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
     }
 
     override fun initializeClusterEnvironment() {
-        this.workloadGeneratorStateCleaner.deleteAll()
+        // this.workloadGeneratorStateCleaner.deleteAll()
+        // since the workloadGenerators are not started they cant be deleted
+
         this.topicManager.deleteTopics(this.config.kafkaTopics)
-        this.topicManager.createTopics(this.config.kafkaTopics, this.config.kafkaPartition, this.config.kafkaReplication)
+        this.topicManager.createTopics(
+            this.config.kafkaTopics,
+            this.config.kafkaPartition,
+            this.config.kafkaReplication
+        )
     }
 
     override fun startSUT(resources: Resource) {
@@ -49,13 +56,13 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
 
         // set environment variables
         val environmentVariables: MutableMap<String, String> = mutableMapOf()
-        environmentVariables.put("KAFKA_BOOTSTRAP_SERVER", this.config.kafkaIPConnectionString)
+        environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
         environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString)
 
 
         // setup deployment
         this.deploymentManager.setReplica(ucDeployment, resources.get())
-        this.deploymentManager.setWorkloadEnv(ucDeployment,"uc-application", environmentVariables)
+        this.deploymentManager.setWorkloadEnv(ucDeployment, "uc-application", environmentVariables)
 
         // create kubernetes resources
         this.deploymentManager.deploy(ucDeployment)
@@ -69,6 +76,11 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
         // TODO ("calculate number of required instances")
         val requiredInstances: Int = 1
         val environmentVariables: MutableMap<String, String> = mutableMapOf()
+        environmentVariables.put("KAFKA_BOOTSTRAP_SERVERS", this.config.kafkaIPConnectionString)
+        environmentVariables.put("ZK_HOST", this.config.schemaRegistryConnectionString.split(":")[0])
+        environmentVariables.put("ZK_PORT", this.config.schemaRegistryConnectionString.split(":")[1])
+
+
         environmentVariables.put("NUM_SENSORS", load.get().toString())
         environmentVariables.put("NUM_INSTANCES", requiredInstances.toString())
 
@@ -87,5 +99,5 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
         val wgDeploymentPath: String,
         val ucImageURL: String,
         val wgImageURL: String
-        ) {}
+    ) {}
 }
\ No newline at end of file
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt
index 8d4dbbf51..816bd65f1 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/TestBenchmark.kt
@@ -1,15 +1,15 @@
 package theodolite.util
 
-class TestBenchmark: Benchmark(config = emptyMap()) {
-    override fun start() {
-        TODO("Not yet implemented")
-    }
-
-    override fun clearClusterEnvironment() {
-        TODO("Not yet implemented")
-    }
-
-    override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) {
-        TODO("Not yet implemented")
-    }
-}
\ No newline at end of file
+//class TestBenchmark: Benchmark(config = emptyMap()) {
+//    override fun start() {
+//        TODO("Not yet implemented")
+//    }
+//
+//    override fun clearClusterEnvironment() {
+//        TODO("Not yet implemented")
+//    }
+//
+//    override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) {
+//        TODO("Not yet implemented")
+//    }
+//}
\ No newline at end of file
-- 
GitLab