Skip to content
Snippets Groups Projects
Commit 2ec424e5 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

merge feature yaml handling from lorenz

parents df6d8e1a e7f239d4
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!78Resolve "Implement Quarkus/Kotlin protype"
Showing
with 374 additions and 11 deletions
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc-application
image: uc-app:latest
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: COMMIT_INTERVAL_MS # Set as default for the applications
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-aggregation
labels:
app: titan-ccp-aggregation
spec:
#type: NodePort
selector:
app: titan-ccp-aggregation
ports:
- name: http
port: 80
targetPort: 80
protocol: TCP
- name: metrics
port: 9980
apiVersion: v1
kind: ConfigMap
metadata:
name: aggregation-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: batch/v1
kind: Job
metadata:
name: theodolite
spec:
template:
spec:
volumes:
- name: theodolite-pv-storage
persistentVolumeClaim:
claimName: theodolite-pv-claim
containers:
- name: theodolite
image: bvonheid/theodolite:latest
# imagePullPolicy: Never # Used to pull "own" local image
env:
- name: UC # mandatory
value: "1"
- name: LOADS # mandatory
value: "100000, 200000"
- name: INSTANCES # mandatory
value: "1, 2, 3"
# - name: DURATION
# value: "5"
# - name: PARTITIONS
# value: "40"
# - name: DOMAIN_RESTRICTION
# value: "True"
# - name: SEARCH_STRATEGY
# value: "linear-search"
# - name: CPU_LIMIT
# value: "1000m"
# - name: MEMORY_LIMIT
# value: "4Gi"
- name: PROMETHEUS_BASE_URL
value: "http://prometheus-operated:9090"
# - name: NAMESPACE
# value: "default"
# - name: CONFIGURATIONS
# value: "COMMIT_INTERVAL_MS=100, NUM_STREAM_THREADS=1"
- name: RESULT_PATH
value: "results"
- name: PYTHONUNBUFFERED # Enable logs in Kubernetes
value: "1"
volumeMounts:
- mountPath: "/app/results"
name: theodolite-pv-storage
restartPolicy: Never
# Uncomment if RBAC is enabled and configured
# serviceAccountName: theodolite
backoffLimit: 4
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
selector:
matchLabels:
app: titan-ccp-load-generator
replicas: 1
template:
metadata:
labels:
app: titan-ccp-load-generator
spec:
terminationGracePeriodSeconds: 0
containers:
- name: workload-generator
image: workload-generator:latest
env:
# Order need to be preserved for run_uc.py
- name: NUM_SENSORS
value: "25000"
- name: INSTANCES
value: "1"
- name: NUM_NESTED_GROUPS
value: "5"
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
package theodolite package theodolite
import io.quarkus.runtime.annotations.QuarkusMain import io.quarkus.runtime.annotations.QuarkusMain
import theodolite.execution.TheodoliteExecutor import theodolite.execution.TheodoliteExecutor
import mu.KotlinLogging import mu.KotlinLogging
......
...@@ -11,7 +11,7 @@ private val logger = KotlinLogging.logger {} ...@@ -11,7 +11,7 @@ private val logger = KotlinLogging.logger {}
class KafkaBenchmarkExecutor(benchmark: Benchmark, results: Results, executionDuration: Duration) : BenchmarkExecutor(benchmark, results, executionDuration) { class KafkaBenchmarkExecutor(benchmark: Benchmark, results: Results, executionDuration: Duration) : BenchmarkExecutor(benchmark, results, executionDuration) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
benchmark.start() benchmark.start(load, res)
this.waitAndLog() this.waitAndLog()
benchmark.stop() benchmark.stop()
// todo evaluate // todo evaluate
......
...@@ -8,7 +8,7 @@ import java.time.Duration ...@@ -8,7 +8,7 @@ import java.time.Duration
class TheodoliteExecutor() { class TheodoliteExecutor() {
private fun loadConfig(): Config { private fun loadConfig(): Config {
val benchmark: Benchmark = KafkaBenchmark(emptyMap()) val benchmark: KafkaBenchmark = KafkaBenchmark(emptyMap())
val results: Results = Results() val results: Results = Results()
val executionDuration = Duration.ofSeconds(60*5 ) val executionDuration = Duration.ofSeconds(60*5 )
val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration) val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration)
......
package theodolite.k8s
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
class ConfigMapManager(client: NamespacedKubernetesClient) {
var client: NamespacedKubernetesClient
init {
this.client = client
}
fun deploy(configMap: ConfigMap) {
client.configMaps().create(configMap)
}
fun delete(configMap: ConfigMap) {
client.configMaps().delete(configMap)
}
}
package theodolite.k8s
import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarSource
import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
class DeploymentManager(client: NamespacedKubernetesClient) {
var client: NamespacedKubernetesClient
init {
this.client = client
}
/**
* Sets the ContainerEvironmentVariables, creates new if variable don t exist.
* @param container - The Container
* @param map - Map of k=Name,v =Value of EnviromentVariables
*/
private fun setContainerEnv(container: Container, map: Map<String, String>) {
map.forEach { k, v ->
// filter for mathing name and set value
val x = container.env.filter { envVar -> envVar.name == k }
if (x.isEmpty()) {
val newVar = EnvVar(k, v, EnvVarSource())
container.env.add(newVar)
} else {
x.forEach {
it.value = v
}
}
}
}
/**
* Set the enviroment Variable for a container
*/
fun setWorkloadEnv(workloadDeployment: Deployment, containerName: String, map: Map<String, String>) {
workloadDeployment.spec.template.spec.containers.filter { it.name == containerName }
.forEach { it: Container ->
setContainerEnv(it, map)
}
}
/**
* Change the RessourceLimit of a container (Usally SUT)
*/
fun changeRessourceLimits(deployment: Deployment, ressource: String, containerName: String, limit: String) {
deployment.spec.template.spec.containers.filter { it.name == containerName }.forEach {
it.resources.limits.replace(ressource, Quantity(limit))
}
}
/**
* Change the image name of a container (SUT and the Worklaodgenerators)
*/
fun setImageName(deployment: Deployment, containerName: String, image: String) {
deployment.spec.template.spec.containers.filter { it.name == containerName }.forEach {
it.image = image
}
}
// TODO potential add exception handling
fun deploy(deployment: Deployment) {
client.apps().deployments().create(deployment)
}
// TODO potential add exception handling
fun delete(deployment: Deployment) {
client.apps().deployments().delete(deployment)
}
}
package theodolite.k8s
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
class ServiceManager(client: NamespacedKubernetesClient) {
var client: NamespacedKubernetesClient
init {
this.client = client
}
fun changeServiceName(service: Service, newName: String) {
service.metadata.apply {
name = newName
}
}
fun deploy(service: Service) {
client.services().create(service)
}
fun delete(service: Service) {
client.services().delete(service)
}
}
...@@ -12,7 +12,7 @@ class TopicManager(boostrapIp: String) { ...@@ -12,7 +12,7 @@ class TopicManager(boostrapIp: String) {
init { init {
try { try {
kafkaAdmin = AdminClient.create(props) kafkaAdmin = AdminClient.create(props)
} catch (e : Exception) { } catch (e: Exception) {
System.out.println(e.toString()) System.out.println(e.toString())
} }
} }
...@@ -45,7 +45,7 @@ class TopicManager(boostrapIp: String) { ...@@ -45,7 +45,7 @@ class TopicManager(boostrapIp: String) {
try { try {
result.all().get() result.all().get()
} catch (ex:Exception) { } catch (ex: Exception) {
System.out.println(ex.toString()) System.out.println(ex.toString())
} }
System.out.println("Topics deleted") System.out.println("Topics deleted")
......
package theodolite package theodolite.k8s
import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.WatchedEvent import org.apache.zookeeper.WatchedEvent
...@@ -15,7 +15,7 @@ class WorkloadGeneratorStateCleaner(ip: String) { ...@@ -15,7 +15,7 @@ class WorkloadGeneratorStateCleaner(ip: String) {
try { try {
val watcher: Watcher = ZookeperWatcher() // defined below val watcher: Watcher = ZookeperWatcher() // defined below
zookeeperClient = ZooKeeper(ip, sessionTimeout, watcher) zookeeperClient = ZooKeeper(ip, sessionTimeout, watcher)
} catch (e:Exception) { } catch (e: Exception) {
System.out.println(e.toString()) System.out.println(e.toString())
} }
} }
...@@ -33,12 +33,14 @@ class WorkloadGeneratorStateCleaner(ip: String) { ...@@ -33,12 +33,14 @@ class WorkloadGeneratorStateCleaner(ip: String) {
try { try {
val clients = zookeeperClient.getChildren(path, true) val clients = zookeeperClient.getChildren(path, true)
if (clients.isEmpty()){ if (clients.isEmpty()) {
break; break;
} }
} catch (ex: Exception) { } catch (ex: Exception) {
when (ex) { when (ex) {
is KeeperException -> { deleted = true } is KeeperException -> {
deleted = true
}
is InterruptedException -> { is InterruptedException -> {
System.out.println(ex.toString()) System.out.println(ex.toString())
} }
......
package theodolite.k8s
import io.fabric8.kubernetes.api.model.ConfigMap
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
class YamlLoader(client: NamespacedKubernetesClient) {
var client: NamespacedKubernetesClient
init {
this.client = client
}
/**
* Parses a Service from a servive yaml
* @param path of the yaml file
* @return service from fabric8
*/
fun loadService(path: String): Service? {
val service = loadGenericRessource(path, { x: String -> client.services().load(x).get() })
return service
}
/**
* Parses a Deployment from a Deployment yaml
* @param path of the yaml file
* @return Deployment from fabric8
*/
fun loadDeployment(path: String): Deployment? {
val deployment = loadGenericRessource(path, { x: String -> client.apps().deployments().load(x).get() })
return deployment
}
/**
* Parses a ConfigMap from a ConfigMap yaml
* @param path of the yaml file
* @return ConfigMap from fabric8
*/
fun loadConfigmap(path: String): ConfigMap? {
val configMap = loadGenericRessource(path, { x: String -> client.configMaps().load(x).get() })
return configMap
}
/**
* Generic helper function to load a resource.
* @param path of the resource
* @param f fuction that shall be applied to the resource.
*/
private fun <T> loadGenericRessource(path: String, f: (String) -> T): T? {
var resource: T? = null
try {
resource = f(path)
} catch (e: Exception) {
logger.info("You potentially misspelled the path: $path")
logger.info("$e")
}
return resource
}
}
...@@ -2,7 +2,7 @@ package theodolite.util ...@@ -2,7 +2,7 @@ package theodolite.util
class KafkaBenchmark(config: Map<String, Any>): Benchmark(config) { class KafkaBenchmark(config: Map<String, Any>): Benchmark(config) {
override fun start() { override fun start(load: LoadDimension, resources: Resource) {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
...@@ -10,8 +10,7 @@ class KafkaBenchmark(config: Map<String, Any>): Benchmark(config) { ...@@ -10,8 +10,7 @@ class KafkaBenchmark(config: Map<String, Any>): Benchmark(config) {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) { override fun startWorkloadGenerator(wg: String, load: LoadDimension, ucId: String) {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment