Skip to content
Snippets Groups Projects
Commit 100d41b6 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Fix onUpdate bug

parent d38b5350
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!106Introduce a Theodolite operator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
......@@ -20,10 +20,6 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?>
fun stop() {
throw InterruptedException()
}
@JsonDeserialize
class Execution : KubernetesResource {
lateinit var strategy: String
......
......@@ -40,7 +40,6 @@ abstract class BenchmarkExecutor(
fun stop() {
run = false
//throw InterruptedException()
}
/**
......
......@@ -28,11 +28,9 @@ class BenchmarkExecutorImpl(
if (this.run) {
result =
AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration)
benchmarkDeployment.teardown()
this.results.setResult(Pair(load, res), result)
}
benchmarkDeployment.teardown()
return result
}
}
......@@ -8,10 +8,9 @@ import theodolite.util.Resource
private val logger = KotlinLogging.logger {}
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) :
Thread() {
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) {
override fun run() {
fun run() {
// Build Configuration to teardown
logger.info { "Received shutdown signal -> Shutting down" }
val deployment =
......@@ -24,4 +23,4 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
deployment.teardown()
logger.info { "Teardown completed" }
}
}
}
\ No newline at end of file
......@@ -22,7 +22,7 @@ class TheodoliteController(
val executionContext: CustomResourceDefinitionContext
) {
lateinit var executor: TheodoliteExecutor
val executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>()
var executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>()
val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap()
/**
......@@ -38,23 +38,20 @@ class TheodoliteController(
}
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Update execution ${oldExecution.metadata.name}" }
logger.info { "Add updated execution to queue" }
if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) {
logger.info { "restart current benchmark with new version" }
executor.stop()
sleep(2000)
startBenchmark(execution = newExecution, benchmark = executor.getBenchmark())
} else {
onDelete(oldExecution, false)
onAdd(newExecution)
executor.executor.run = false
}
newExecution.name = newExecution.metadata.name
executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
executionsQueue.add(newExecution)
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue" }
executionsQueue.removeIf { e -> e.name == execution.metadata.name }
if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) {
executor.stop()
executor.executor.run = false
logger.info { "Current benchmark stopped" }
}
}
......@@ -69,12 +66,12 @@ class TheodoliteController(
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}" }
newBenchmark.name = newBenchmark.metadata.name
if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) {
logger.info { "restart current benchmark with new version" }
executor.stop()
sleep(2000)
startBenchmark(execution = executor.getExecution(), benchmark = newBenchmark)
executor.executor.run = false
val execution = executor.getExecution()
execution.name = execution.name + System.currentTimeMillis()
executionsQueue.add(execution)
} else {
onAdd(newBenchmark)
}
......@@ -84,7 +81,7 @@ class TheodoliteController(
logger.info { "Delete benchmark ${benchmark.metadata.name}" }
benchmarks.remove(benchmark.metadata.name)
if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) {
executor.stop()
executor.executor.run = false
logger.info { "Current benchmark stopped" }
}
}
......@@ -95,9 +92,9 @@ class TheodoliteController(
while (true) {
try {
reconcile()
logger.info { "Theodolite is waiting for new jobs" }
sleep(2000)
if (this::executor.isInitialized && !executor.isRunning) {
logger.info { "Theodolite is waiting for new jobs" }
sleep(1000)
}
} catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e" }
......@@ -108,8 +105,7 @@ class TheodoliteController(
@Synchronized
private fun reconcile() {
while (executionsQueue.isNotEmpty()
&& ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)
) {
&& ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)) {
val execution = executionsQueue.peek()
val benchmark = benchmarks[execution.benchmark]
......@@ -117,28 +113,19 @@ class TheodoliteController(
logger.debug { "No benchmark found for execution ${execution.benchmark}" }
sleep(1000)
} else {
startBenchmark(execution, benchmark)
runExecution(execution, benchmark)
}
}
}
fun startBenchmark(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
@Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" }
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run()
// wait until execution is deleted
try {
if (executionsQueue.contains(execution)) {
client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
sleep(1000)
while (executionsQueue.contains(execution)) {
sleep(2000)
logger.info { "Delete of execution: ${execution.name} failed. Retrying in 2 second." }
client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
}
} catch (e: Exception) {
logger.error { "Error while delete current execution: $e" }
}
logger.info { "Execution of ${execution.name} is finally stopped" }
}
......
......@@ -18,19 +18,6 @@ class TheodoliteExecutor(
private var config: BenchmarkExecution,
private var kubernetesBenchmark: KubernetesBenchmark
) {
private val executionThread = Thread() {
if (config == null || kubernetesBenchmark == null) {
logger.error { "Execution or Benchmark not found" }
} else {
val config = buildConfig()
// execute benchmarks for each load
for (load in config.loads) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
isRunning = false
}
var isRunning = false
lateinit var executor: BenchmarkExecutor
......@@ -91,29 +78,16 @@ class TheodoliteExecutor(
fun run() {
isRunning = true
logger.info { "Start thread" }
// executionThread.start()
if (config == null || kubernetesBenchmark == null) {
logger.error { "Execution or Benchmark not found" }
} else {
val config = buildConfig()
// execute benchmarks for each load
for (load in config.loads) {
if (isRunning) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
val config = buildConfig()
// execute benchmarks for each load
for (load in config.loads) {
if (isRunning) {
config.compositeStrategy.findSuitableResource(load, config.resources)
}
logger.info { "Stop Thread" }
isRunning = false
}
}
fun stop() {
logger.info { "Stop Thread" }
isRunning = false
try {
executor.stop()
Shutdown(config, kubernetesBenchmark).run()
} catch (e: InterruptedException) {
logger.warn { "Execution stopped" }
}
}
}
......@@ -5,6 +5,7 @@ import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.YamlParser
import kotlin.concurrent.thread
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {}
......@@ -23,12 +24,12 @@ object TheodoliteYamlExecutor {
parser.parse("./../../../resources/main/yaml/BenchmarkType.yaml", KubernetesBenchmark::class.java)!!
val shutdown = Shutdown(benchmarkExecution, benchmark)
Runtime.getRuntime().addShutdownHook(shutdown)
Runtime.getRuntime().addShutdownHook(thread { shutdown.run()})
val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run()
logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(shutdown)
Runtime.getRuntime().removeShutdownHook(thread { shutdown.run()})
exitProcess(0)
}
}
......@@ -25,4 +25,13 @@ execution:
restrictions:
- "LowerBound"
configOverrides:
-
\ No newline at end of file
- patcher:
type: "NodeSelectorPatcher"
resource: "uc1-load-generator-deployment.yaml"
variableName: "env"
value: "prod"
- patcher:
type: "NodeSelectorPatcher"
resource: "uc1-kstreams-deployment.yaml"
variableName: "env"
value: "prod"
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment