Skip to content
Snippets Groups Projects
Commit 28654b4f authored by Benedikt Wetzel's avatar Benedikt Wetzel Committed by Sören Henning
Browse files

Add delay before start the load generator

parent 48b944d6
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!135Add delay before start the load generator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 48 additions and 26 deletions
......@@ -17,6 +17,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
delay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
delay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ interface Benchmark {
fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment
}
......@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String>
var delay by Delegates.notNull<Long>()
}
/**
......
......@@ -70,31 +70,36 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." }
val resources = loadKubernetesResources(this.appResource + this.loadGenResource)
val appResources = loadKubernetesResources(this.appResource)
val loadGenResources = loadKubernetesResources(this.loadGenResource)
val patcherFactory = PatcherFactory()
// patch the load dimension the resources
load.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(load.get().toString())
patcherFactory.createPatcher(patcherDefinition, loadGenResources).patch(load.get().toString())
}
res.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString())
patcherFactory.createPatcher(patcherDefinition, appResources).patch(res.get().toString())
}
// Patch the given overrides
configurationOverrides.forEach { override ->
override?.let {
patcherFactory.createPatcher(it.patcher, resources).patch(override.value)
patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
}
}
return KubernetesBenchmarkDeployment(
namespace = namespace,
resources = resources.map { r -> r.second },
appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
delay = delay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
......
......@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
import java.time.Duration
private val logger = KotlinLogging.logger {}
......@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {}
@RegisterForReflection
class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val delay: Long,
private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
......@@ -41,7 +44,10 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) }
appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.delay} seconds before starting the workload generator." }
Thread.sleep(Duration.ofSeconds(this.delay).toMillis())
loadGenResources.forEach { kubernetesManager.deploy(it) }
}
/**
......@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s.
*/
override fun teardown() {
resources.forEach {
kubernetesManager.remove(it)
}
loadGenResources.forEach { kubernetesManager.remove(it) }
appResources.forEach { kubernetesManager.remove(it) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
......
......@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark,
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val executionId: Int
val executionId: Int,
val delay: Long
) {
var run: AtomicBoolean = AtomicBoolean(true)
......
......@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
executionId: Int
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) {
executionId: Int,
delay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, delay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, delay)
try {
benchmarkDeployment.setup()
......
......@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
benchmark.buildDeployment(
load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides
configurationOverrides = benchmarkExecution.configOverrides,
delay = 0L
)
deployment.teardown()
} catch (e: Exception) {
......
......@@ -71,7 +71,8 @@ class TheodoliteExecutor(
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
executionId = config.executionId
executionId = config.executionId,
delay = config.execution.delay
)
return Config(
......
......@@ -22,6 +22,7 @@ execution:
strategy: "LinearSearch"
duration: 60
repetitions: 1
delay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides:
......
......@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......
......@@ -11,7 +11,8 @@ class TestBenchmark : Benchmark {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment {
return TestBenchmarkDeployment()
}
......
......@@ -13,7 +13,8 @@ class TestBenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
slo: BenchmarkExecution.Slo,
executionId: Int
executionId: Int,
delay: Long
) :
BenchmarkExecutor(
benchmark,
......@@ -21,7 +22,8 @@ class TestBenchmarkExecutorImpl(
executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(),
slo = slo,
executionId = executionId
executionId = executionId,
delay = delay
) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment