Skip to content
Snippets Groups Projects
Commit c65ff2df authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' into wetzel/spesb-198-update-k8s-theodolite-job

parents b18f8a47 a5691ef9
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!122Update Theodolite Kubernetes Job,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 251 additions and 88 deletions
......@@ -242,6 +242,7 @@ build-theodolite-native:
script:
- gu install native-image # TODO move to image
- ./gradlew --build-cache assemble -Dquarkus.package.type=native
when: manual
artifacts:
paths:
- "theodolite-quarkus/build/*-runner"
......@@ -252,7 +253,7 @@ test-theodolite:
extends: .theodolite
needs:
- build-theodolite-jvm
- build-theodolite-native
#- build-theodolite-native
script: ./gradlew test --stacktrace
# Disabled for now
......@@ -279,12 +280,13 @@ deploy-theodolite:
- .theodolite
- .dind
needs:
- build-theodolite-native
#- build-theodolite-native
- build-theodolite-jvm
- test-theodolite
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build -f src/main/docker/Dockerfile.native -t theodolite .
#- docker build -f src/main/docker/Dockerfile.jvm -t theodolite .
#- docker build -f src/main/docker/Dockerfile.native -t theodolite .
- docker build -f src/main/docker/Dockerfile.jvm -t theodolite .
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$CI_COMMIT_TAG"
......@@ -309,6 +311,7 @@ deploy-slo-checker-lag-trend:
stage: deploy
extends:
- .dind
needs: []
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-slo-checker-lag-trend slope-evaluator
......@@ -335,6 +338,7 @@ deploy-random-scheduler:
stage: deploy
extends:
- .dind
needs: []
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-random-scheduler execution/infrastructure/random-scheduler
......
......@@ -23,4 +23,37 @@ Run the Docker image:
## Configuration
You can set the `HOST` and the `PORT` (and a lot of more parameters) via environment variables. Default is `0.0.0.0:80`.
For more information see [here](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker#advanced-usage).
For more information see the [Gunicorn/FastAPI Docker docs](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker#advanced-usage).
## API Documentation
The running webserver provides a REST API with the following route:
* /evaluate-slope
* Method: POST
* Body:
* total_lag
* threshold
* warmup
The body of the request must be a JSON string that satisfies the following conditions:
* **total_lag**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON structure:
```
{
"metric": {
"group": "<label_value>"
},
"values": [
[
<unix_timestamp>,
"<sample_value>"
]
]
}
```
* The `<label_value>` provided in "metric.group" must be equal to the id of the Kafka consumer group.
* The `<unix_timestamp>` provided as the first element of each element in the "values" array must be the timestamp of the measurement value in seconds (with optional decimal precision)
* The `<sample_value>` must be the measurement value as string.
* **threshold**: Must be an unsigned integer that specifies the threshold for the SLO evaluation. The SLO is considered fulfilled, if the result value is below the threshold. If the result value is equal or above the threshold, the SLO is considered not fulfilled.
* **warmup**: Specifieds the warmup time in seconds that are ignored for evaluating the SLO.
\ No newline at end of file
......@@ -18,7 +18,7 @@ if os.getenv('LOG_LEVEL') == 'INFO':
elif os.getenv('LOG_LEVEL') == 'WARNING':
logger.setLevel(logging.WARNING)
elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel((logging.DEBUG))
logger.setLevel(logging.DEBUG)
def execute(results, threshold, warmup):
d = []
......@@ -30,18 +30,18 @@ def execute(results, threshold, warmup):
df = pd.DataFrame(d)
logger.info(df)
logger.info("Calculating trend slope with warmup of %s seconds for data frame:\n %s", warmup, df)
try:
trend_slope = trend_slope_computer.compute(df, warmup)
except Exception as e:
err_msg = 'Computing trend slope failed'
err_msg = 'Computing trend slope failed.'
logger.exception(err_msg)
logger.error('Mark this subexperiment as not successful and continue benchmark')
logger.error('Mark this subexperiment as not successful and continue benchmark.')
return False
logger.info("Trend Slope: %s", trend_slope)
return trend_slope < threshold
result = trend_slope < threshold
logger.info("Computed lag trend slope is '%s'. Result is: %s", trend_slope, result)
return result
@app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request):
......
......@@ -2,13 +2,12 @@ from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(x, warmup_sec):
input = x
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
def compute(data, warmup_sec):
data['sec_start'] = data.loc[0:, 'timestamp'] - data.iloc[0]['timestamp']
regress = data.loc[data['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
X = regress.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
......
......@@ -18,16 +18,16 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation 'io.quarkus:quarkus-arc'
implementation 'io.quarkus:quarkus-resteasy'
testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.rest-assured:rest-assured'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'org.slf4j:slf4j-simple:1.7.29'
implementation 'io.github.microutils:kotlin-logging:1.12.0'
implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2'
implementation 'io.quarkus:quarkus-kubernetes-client'
implementation 'org.apache.kafka:kafka-clients:2.7.0'
implementation 'khttp:khttp:1.0.0'
testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.rest-assured:rest-assured'
}
group 'theodolite'
......
......@@ -19,9 +19,13 @@ loadTypes:
resource: "uc1-load-generator-deployment.yaml"
container: "workload-generator"
variableName: "NUM_SENSORS"
- type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml"
kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092"
topics:
- name: "input"
numPartitions: 40
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -23,9 +23,13 @@ loadTypes:
resource: "uc1-load-generator-deployment.yaml"
container: "workload-generator"
variableName: "NUM_SENSORS"
- type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml"
kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092"
topics:
- name: "input"
numPartitions: 40
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
namespace = namespace,
resources = resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.getKafkaTopics(),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
)
}
......
......@@ -3,9 +3,13 @@ package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
private val logger = KotlinLogging.logger {}
/**
* Organizes the deployment of benchmarks in Kubernetes.
......@@ -20,12 +24,13 @@ class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
private val kafkaConfig: HashMap<String, Any>,
private val topics: Collection<NewTopic>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client)
private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val SLEEP_AFTER_TEARDOWN = 5000L
/**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
......@@ -33,10 +38,10 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources.
*/
override fun setup() {
kafkaController.createTopics(this.topics)
resources.forEach {
kubernetesManager.deploy(it)
}
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) }
}
/**
......@@ -46,10 +51,12 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s.
*/
override fun teardown() {
KafkaLagExporterRemover(client).remove(LABEL)
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
resources.forEach {
kubernetesManager.remove(it)
}
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
Thread.sleep(SLEEP_AFTER_TEARDOWN)
}
}
......@@ -4,8 +4,11 @@ import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.LoadDimension
import theodolite.util.Resource
import java.text.Normalizer
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.regex.Pattern
private val logger = KotlinLogging.logger {}
......@@ -46,7 +49,10 @@ class AnalysisExecutor(
resultsFolder += "/"
}
CsvExporter().toCsv(name = "$resultsFolder$executionId-${load.get()}-${res.get()}-${slo.sloType}", prom = prometheusData)
CsvExporter().toCsv(
name = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}",
prom = prometheusData
)
val sloChecker = SloCheckerFactory().create(
sloType = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
......@@ -60,8 +66,18 @@ class AnalysisExecutor(
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" }
}
return result
}
private val NONLATIN: Pattern = Pattern.compile("[^\\w-]")
private val WHITESPACE: Pattern = Pattern.compile("[\\s]")
fun String.toSlug(): String {
val noWhitespace: String = WHITESPACE.matcher(this).replaceAll("-")
val normalized: String = Normalizer.normalize(noWhitespace, Normalizer.Form.NFD)
val slug: String = NONLATIN.matcher(normalized).replaceAll("")
return slug.toLowerCase(Locale.ENGLISH)
}
}
......@@ -24,12 +24,12 @@ class CsvExporter {
val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("name", "time", "value").joinToString())
pw.println(listOf("group", "timestamp", "value").joinToString())
responseArray.forEach {
pw.println(it.joinToString())
}
}
logger.info { "Wrote csv file: $name to ${csvOutputFile.absolutePath}" }
logger.info { "Wrote CSV file: $name to ${csvOutputFile.absolutePath}." }
}
/**
......@@ -41,9 +41,11 @@ class CsvExporter {
val dataList = mutableListOf<List<String>>()
if (values != null) {
for (x in values) {
val y = x as List<*>
dataList.add(listOf(name, "${y[0]}", "${y[1]}"))
for (maybeValuePair in values) {
val valuePair = maybeValuePair as List<*>
val timestamp = (valuePair[0] as Double).toLong().toString()
val value = valuePair[1].toString()
dataList.add(listOf(name, timestamp, value))
}
}
return Collections.unmodifiableList(dataList)
......
......@@ -17,8 +17,7 @@ class ExternalSloChecker(
private val externalSlopeURL: String,
private val threshold: Int,
private val warmup: Int
) :
SloChecker {
) : SloChecker {
private val RETRIES = 2
private val TIMEOUT = 60.0
......@@ -38,19 +37,23 @@ class ExternalSloChecker(
*/
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0
val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
val data = Gson().toJson(mapOf(
"total_lag" to fetchedData.data?.result,
"threshold" to threshold,
"warmup" to warmup))
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) {
counter++
logger.error { "Could not reach external slope analysis" }
logger.error { "Could not reach external SLO checker" }
} else {
return result.text.toBoolean()
val booleanResult = result.text.toBoolean()
logger.info { "SLO checker result is: $booleanResult" }
return booleanResult
}
}
throw ConnectException("Could not reach slope evaluation")
throw ConnectException("Could not reach external SLO checker")
}
}
......@@ -48,18 +48,18 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = TIMEOUT)
if (response.statusCode != 200) {
val message = response.jsonObject.toString()
logger.warn { "Could not connect to Prometheus: $message, retrying now" }
logger.warn { "Could not connect to Prometheus: $message. Retrying now." }
counter++
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values between $start and $end for querry $query" }
logger.error { "Empty query result: $values between $start and $end for query $query." }
throw NoSuchFieldException()
}
return parseValues(response)
}
}
throw ConnectException("No answer from Prometheus received")
throw ConnectException("No answer from Prometheus received.")
}
/**
......
......@@ -47,7 +47,7 @@ abstract class BenchmarkExecutor(
*
*/
fun waitAndLog() {
logger.info { "Execution of a new benchmark started." }
logger.info { "Execution of a new experiment started." }
var secondsRunning = 0L
......
......@@ -30,7 +30,7 @@ class BenchmarkExecutorImpl(
benchmarkDeployment.setup()
this.waitAndLog()
} catch (e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error while setting up experiment with id ${this.executionId}." }
logger.error { "Error is: $e" }
this.run.set(false)
}
......@@ -39,11 +39,20 @@ class BenchmarkExecutorImpl(
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) {
result =
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration)
result = AnalysisExecutor(slo = slo, executionId = executionId).analyze(
load = load,
res = res,
executionDuration = executionDuration
)
this.results.setResult(Pair(load, res), result)
}
try {
benchmarkDeployment.teardown()
} catch (e: Exception) {
logger.warn { "Error while tearing down the benchmark deployment." }
logger.debug { "Teardown failed, caused by: $e" }
}
return result
}
......
......@@ -13,13 +13,17 @@ object Main {
@JvmStatic
fun main(args: Array<String>) {
val mode = System.getenv("MODE") ?: "yaml-executor"
val mode = System.getenv("MODE") ?: "standalone"
logger.info { "Start Theodolite with mode $mode" }
when(mode) {
"yaml-executor" -> TheodoliteYamlExecutor().start()
"standalone" -> TheodoliteYamlExecutor().start()
"yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start()
else -> {logger.error { "MODE $mode not found" }; exitProcess(1)}
else -> {
logger.error { "MODE $mode not found" }
exitProcess(1)
}
}
}
}
\ No newline at end of file
......@@ -6,6 +6,9 @@ import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
/**
* This class is used to deploy or remove different Kubernetes resources.
......@@ -39,16 +42,32 @@ class K8sManager(private val client: NamespacedKubernetesClient) {
*/
fun remove(resource: KubernetesResource) {
when (resource) {
is Deployment ->
is Deployment -> {
val label = resource.spec.selector.matchLabels["app"]!!
this.client.apps().deployments().delete(resource)
blockUntilPodsDeleted(label)
logger.info { "Deployment '${resource.metadata.name}' deleted." }
}
is Service ->
this.client.services().delete(resource)
is ConfigMap ->
this.client.configMaps().delete(resource)
is StatefulSet ->
is StatefulSet -> {
val label = resource.spec.selector.matchLabels["app"]!!
this.client.apps().statefulSets().delete(resource)
blockUntilPodsDeleted(label)
logger.info { "StatefulSet '$resource.metadata.name' deleted." }
}
is ServiceMonitorWrapper -> resource.delete(client)
else -> throw IllegalArgumentException("Unknown Kubernetes resource.")
}
}
private fun blockUntilPodsDeleted(podLabel: String) {
while (!this.client.pods().withLabel(podLabel).list().items.isNullOrEmpty()) {
logger.info { "Wait for pods with label '$podLabel' to be deleted." }
Thread.sleep(1000)
}
}
}
......@@ -2,28 +2,50 @@ package theodolite.k8s
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.errors.TopicExistsException
import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {}
private const val RETRY_TIME = 2000L
/**
* Manages the topics related tasks
* @param kafkaConfig Kafka Configuration as HashMap
* @param kafkaConfig Kafka configuration as a Map
* @constructor Creates a KafkaAdminClient
*/
class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
class TopicManager(private val kafkaConfig: Map<String, Any>) {
/**
* Creates topics.
* @param newTopics List of all Topic that should be created
* Create topics.
* @param newTopics Collection of all topic that should be created
*/
fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.createTopics(newTopics)
result.all().get()// wait for the future object
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
lateinit var result: CreateTopicsResult
do {
var retryCreation = false
try {
result = kafkaAdmin.createTopics(newTopics)
result.all().get() // wait for the future to be completed
} catch (e: Exception) { // TopicExistsException
logger.warn(e) { "Error during topic creation." }
logger.debug { e } // TODO remove due to attached exception to warn log?
logger.info { "Remove existing topics." }
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME)
retryCreation = true
}
} while (retryCreation)
logger.info {
"Topics created finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
"Topics creation finished with result: ${
result
.values()
.map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
......@@ -31,14 +53,40 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
}
/**
* Removes topics.
* @param topics List of names with the topics to remove.
* Remove topics.
* @param topics Collection of names for the topics to remove.
*/
fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin)
kafkaAdmin.close()
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
}
return false
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false
while (!deleted) {
try {
val result = kafkaAdmin.deleteTopics(topics)
result.all().get() // wait for the future object
result.all().get() // wait for the future to be completed
logger.info {
"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
......@@ -46,9 +94,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
}"
}
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
logger.error(e) { "Error while removing topics: $e" }
logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." }
}
val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) }
if (toDelete.isNullOrEmpty()) {
deleted = true
} else {
logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME)
}
kafkaAdmin.close()
}
}
}
......@@ -12,7 +12,7 @@ import io.fabric8.kubernetes.api.model.KubernetesResource
* @param variableName *(optional)* The variable name to be patched
*
*
* **For example** to patch the load dimension of a workload generator, the Patcher should be created as follow:
* **For example** to patch the load dimension of a load generator, the patcher should be created as follow:
*
* k8sResource: `uc-1-workload-generator.yaml`
* container: `workload`
......
......@@ -2,7 +2,6 @@ package theodolite.patcher
import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarSource
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.apps.Deployment
......@@ -39,7 +38,9 @@ class EnvVarPatcher(
val x = container.env.filter { envVar -> envVar.name == k }
if (x.isEmpty()) {
val newVar = EnvVar(k, v, EnvVarSource())
val newVar = EnvVar()
newVar.name = k
newVar.value = v
container.env.add(newVar)
} else {
x.forEach {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment