Skip to content
Snippets Groups Projects
Commit a10e11de authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '221-add-delay-before-create-wl' into 'theodolite-kotlin'

Add delay before start the load generator

See merge request !135
parents 48b944d6 d873ecba
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!135Add delay before start the load generator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2969 passed
Showing
with 49 additions and 27 deletions
...@@ -22,7 +22,7 @@ loadTypes: ...@@ -22,7 +22,7 @@ loadTypes:
- type: "NumSensorsLoadGeneratorReplicaPatcher" - type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml" resource: "uc1-load-generator-deployment.yaml"
kafkaConfig: kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092" bootstrapServer: "localhost:31290"
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
......
...@@ -17,6 +17,7 @@ execution: ...@@ -17,6 +17,7 @@ execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 # in seconds duration: 300 # in seconds
repetitions: 1 repetitions: 1
loadGenerationDelay: 30 # in seconds, optional field, default is 0 seconds
restrictions: restrictions:
- "LowerBound" - "LowerBound"
configOverrides: [] configOverrides: []
......
...@@ -21,6 +21,7 @@ execution: ...@@ -21,6 +21,7 @@ execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 # in seconds duration: 300 # in seconds
repetitions: 1 repetitions: 1
delay: 30 # in seconds
restrictions: restrictions:
- "LowerBound" - "LowerBound"
configOverrides: [] configOverrides: []
......
...@@ -21,6 +21,7 @@ interface Benchmark { ...@@ -21,6 +21,7 @@ interface Benchmark {
fun buildDeployment( fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?> configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment ): BenchmarkDeployment
} }
...@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var duration by Delegates.notNull<Long>() var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>() var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String> lateinit var restrictions: List<String>
var loadGenerationDelay = 0L
} }
/** /**
......
...@@ -70,31 +70,36 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -70,31 +70,36 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
override fun buildDeployment( override fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?> configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long
): BenchmarkDeployment { ): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." } logger.info { "Using $path as resource path." }
val resources = loadKubernetesResources(this.appResource + this.loadGenResource) val appResources = loadKubernetesResources(this.appResource)
val loadGenResources = loadKubernetesResources(this.loadGenResource)
val patcherFactory = PatcherFactory() val patcherFactory = PatcherFactory()
// patch the load dimension the resources // patch the load dimension the resources
load.getType().forEach { patcherDefinition -> load.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(load.get().toString()) patcherFactory.createPatcher(patcherDefinition, loadGenResources).patch(load.get().toString())
} }
res.getType().forEach { patcherDefinition -> res.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString()) patcherFactory.createPatcher(patcherDefinition, appResources).patch(res.get().toString())
} }
// Patch the given overrides // Patch the given overrides
configurationOverrides.forEach { override -> configurationOverrides.forEach { override ->
override?.let { override?.let {
patcherFactory.createPatcher(it.patcher, resources).patch(override.value) patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
} }
} }
return KubernetesBenchmarkDeployment( return KubernetesBenchmarkDeployment(
namespace = namespace, namespace = namespace,
resources = resources.map { r -> r.second }, appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics, topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace) client = DefaultKubernetesClient().inNamespace(namespace)
......
...@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic ...@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig import theodolite.util.KafkaConfig
import java.time.Duration
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {} ...@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {}
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmarkDeployment( class KubernetesBenchmarkDeployment(
val namespace: String, val namespace: String,
val resources: List<KubernetesResource>, val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long,
private val kafkaConfig: HashMap<String, Any>, private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>, private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient private val client: NamespacedKubernetesClient
...@@ -41,7 +44,10 @@ class KubernetesBenchmarkDeployment( ...@@ -41,7 +44,10 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics) kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) } appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis())
loadGenResources.forEach { kubernetesManager.deploy(it) }
} }
/** /**
...@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment( ...@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s. * - Remove the [KubernetesResource]s.
*/ */
override fun teardown() { override fun teardown() {
resources.forEach { loadGenResources.forEach { kubernetesManager.remove(it) }
kubernetesManager.remove(it) appResources.forEach { kubernetesManager.remove(it) }
}
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." } logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
......
...@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor( ...@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark, val benchmark: Benchmark,
val results: Results, val results: Results,
val executionDuration: Duration, val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>, val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo, val slo: BenchmarkExecution.Slo,
val executionId: Int val executionId: Int,
val loadGenerationDelay: Long
) { ) {
var run: AtomicBoolean = AtomicBoolean(true) var run: AtomicBoolean = AtomicBoolean(true)
......
...@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl( ...@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
results: Results, results: Results,
executionDuration: Duration, executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
executionId: Int executionId: Int,
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) { loadGenerationDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay)
try { try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
......
...@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
benchmark.buildDeployment( benchmark.buildDeployment(
load = LoadDimension(0, emptyList()), load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()), res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides configurationOverrides = benchmarkExecution.configOverrides,
loadGenerationDelay = 0L
) )
deployment.teardown() deployment.teardown()
} catch (e: Exception) { } catch (e: Exception) {
......
...@@ -71,7 +71,8 @@ class TheodoliteExecutor( ...@@ -71,7 +71,8 @@ class TheodoliteExecutor(
executionDuration = executionDuration, executionDuration = executionDuration,
configurationOverrides = config.configOverrides, configurationOverrides = config.configOverrides,
slo = config.slos[0], slo = config.slos[0],
executionId = config.executionId executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay
) )
return Config( return Config(
......
...@@ -22,6 +22,7 @@ execution: ...@@ -22,6 +22,7 @@ execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 60 duration: 60
repetitions: 1 repetitions: 1
delay: 30 # in seconds
restrictions: restrictions:
- "LowerBound" - "LowerBound"
configOverrides: configOverrides:
......
...@@ -31,7 +31,7 @@ class CompositeStrategyTest { ...@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0) val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val linearSearch = LinearSearch(benchmarkExecutor) val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -65,7 +65,7 @@ class CompositeStrategyTest { ...@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl = val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0) TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl) val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -98,7 +98,7 @@ class CompositeStrategyTest { ...@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0) val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutor) val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
......
...@@ -11,7 +11,8 @@ class TestBenchmark : Benchmark { ...@@ -11,7 +11,8 @@ class TestBenchmark : Benchmark {
override fun buildDeployment( override fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?> configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long
): BenchmarkDeployment { ): BenchmarkDeployment {
return TestBenchmarkDeployment() return TestBenchmarkDeployment()
} }
......
...@@ -13,7 +13,8 @@ class TestBenchmarkExecutorImpl( ...@@ -13,7 +13,8 @@ class TestBenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
results: Results, results: Results,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
executionId: Int executionId: Int,
loadGenerationDelay: Long
) : ) :
BenchmarkExecutor( BenchmarkExecutor(
benchmark, benchmark,
...@@ -21,7 +22,8 @@ class TestBenchmarkExecutorImpl( ...@@ -21,7 +22,8 @@ class TestBenchmarkExecutorImpl(
executionDuration = Duration.ofSeconds(1), executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(), configurationOverrides = emptyList(),
slo = slo, slo = slo,
executionId = executionId executionId = executionId,
loadGenerationDelay = loadGenerationDelay
) { ) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment