Skip to content
Snippets Groups Projects
Commit d41f4c90 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch '109-implement-kotlin-prototype' of...

Merge branch '109-implement-kotlin-prototype' of git.se.informatik.uni-kiel.de:stu200776/spesb into 109-implement-kotlin-prototype
parents 8bf355d3 f1e49211
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!78Resolve "Implement Quarkus/Kotlin protype"
Showing
with 114 additions and 79 deletions
......@@ -13,7 +13,7 @@ class KafkaBenchmarkExecutor(benchmark: Benchmark, results: Results, executionDu
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
benchmark.start(load, res)
this.waitAndLog()
benchmark.stop()
benchmark.clearClusterEnvironment()
// todo evaluate
val result = false // if success else false
this.results.setResult(Pair(load, res), result)
......
package theodolite.execution
import mu.KotlinLogging
import theodolite.execution.BenchmarkExecutor
import theodolite.util.Benchmark
import theodolite.util.LoadDimension
......@@ -13,8 +14,6 @@ class TestBenchmarkExecutor(private val mockResults: Array<Array<Boolean>>, benc
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
val result = this.mockResults[load.get()][res.get()]
System.out.println("load :" + load.get().toString() + ", res: " + res.get().toString() + ", res: " + result)
this.results.setResult(Pair(load, res), result)
return result;
}
......
package theodolite.execution
import mu.KotlinLogging
import theodolite.k8s.UC1Benchmark
import theodolite.strategies.restriction.LowerBoundRestriction
import theodolite.strategies.searchstrategy.CompositeStrategy
import theodolite.strategies.searchstrategy.LinearSearch
import theodolite.util.*
import java.time.Duration
private val logger = KotlinLogging.logger {}
class TheodoliteExecutor() {
private fun loadConfig(): Config {
val benchmark: KafkaBenchmark = KafkaBenchmark(emptyMap())
val benchmark: UC1Benchmark = UC1Benchmark(
UC1Benchmark.UC1BenchmarkConfig(
zookeeperConnectionString = "my-confluent-cp-zookeeper:2181",
kafkaIPConnectionString = "my-confluent-cp-kafka:9092",
schemaRegistryConnectionString = "http://my-confluent-cp-schema-registry:8081",
kafkaPartition = 40,
kafkaReplication = 3,
kafkaTopics = listOf("input", "output"),
// TODO("handle path in a more nice way (not absolut)")
ucDeploymentPath = "src/main/resources/yaml/aggregation-deployment.yaml",
ucServicePath = "src/main/resources/yaml/aggregation-service.yaml",
wgDeploymentPath = "src/main/resources/yaml/workloadGenerator.yaml",
ucImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest",
wgImageURL = "ghcr.io/cau-se/theodolite-uc1-kstreams-workload-generator:latest"
))
val results: Results = Results()
val executionDuration = Duration.ofSeconds(60*5)
val executor: BenchmarkExecutor = KafkaBenchmarkExecutor(benchmark, results, executionDuration)
......@@ -30,7 +48,7 @@ class TheodoliteExecutor() {
// execute benchmarks for each load
for(load in config.loads) {
config.compositeStrategy.findSuitableResources(load, config.resources)
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
......
package theodolite.k8s
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic
private val logger = KotlinLogging.logger {}
class TopicManager(boostrapIp: String) {
val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to boostrapIp)
lateinit var kafkaAdmin: AdminClient
......@@ -13,7 +16,7 @@ class TopicManager(boostrapIp: String) {
try {
kafkaAdmin = AdminClient.create(props)
} catch (e: Exception) {
System.out.println(e.toString())
logger.error {e.toString()}
}
}
......@@ -25,7 +28,7 @@ class TopicManager(boostrapIp: String) {
newTopics.add(tops)
}
kafkaAdmin.createTopics(newTopics)
System.out.println("Topics created")
logger.info {"Topics created"}
}
fun createTopics(topics: List<String>, numPartitions: Int, replicationfactor: Short) {
......@@ -36,7 +39,7 @@ class TopicManager(boostrapIp: String) {
newTopics.add(tops)
}
kafkaAdmin.createTopics(newTopics)
System.out.println("Creation of $topics started")
logger.info {"Creation of $topics started"}
}
fun deleteTopics(topics: List<String>) {
......@@ -46,9 +49,9 @@ class TopicManager(boostrapIp: String) {
try {
result.all().get()
} catch (ex: Exception) {
System.out.println(ex.toString())
logger.error {ex.toString()}
}
System.out.println("Topics deleted")
logger.info {"Topics deleted"}
}
fun getTopics(): ListTopicsResult? {
......
......@@ -3,14 +3,10 @@ package theodolite.k8s
import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import org.apache.kafka.common.internals.Topic
import theodolite.util.Benchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.k8s.WorkloadGeneratorStateCleaner
import java.io.FileNotFoundException
class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val workloadGeneratorStateCleaner: WorkloadGeneratorStateCleaner
......@@ -31,49 +27,52 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
this.yamlLoader = YamlLoader(this.kubernetesClient)
this.deploymentManager = DeploymentManager(this.kubernetesClient)
this.serviceManager = ServiceManager(this.kubernetesClient)
ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath)!!
ucService = this.yamlLoader.loadService(this.config.ucServicePath)!!
ucDeployment = this.yamlLoader.loadDeployment(this.config.ucDeploymentPath)
ucService = this.yamlLoader.loadService(this.config.ucServicePath)
}
override fun clearClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteAll()
this.topicManager.deleteTopics(this.config.kafkaTopics)
this.deploymentManager.delete(this.ucDeployment)
this.serviceManager.delete(this.ucService)
}
override fun start(load: LoadDimension, resources: Resource) {
// TODO("extract code to dedicated functions. And, we should create a abstration layer to create the benchmark core, which are identically by all benchmarks")
override fun initializeClusterEnvironment() {
this.workloadGeneratorStateCleaner.deleteAll()
this.topicManager.deleteTopics(this.config.kafkaTopics)
this.topicManager.createTopics(this.config.kafkaTopics, this.config.kafkaPartition, this.config.kafkaReplication)
}
override fun startSUT(resources: Resource) {
this.deploymentManager.setImageName(ucDeployment, "uc-application", this.config.ucImageURL)
// set environment variables
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("KAFKA_BOOTSTRAP_SERVER", this.config.kafkaIPConnectionString)
// environmentVariables.put("replicas", this.config.deploymentReplicas) TODO("add possibility to set replicas")
environmentVariables.put("SCHEMA_REGISTRY_URL", this.config.schemaRegistryConnectionString)
// setup deployment
this.deploymentManager.setReplica(ucDeployment, resources.get())
this.deploymentManager.setWorkloadEnv(ucDeployment,"uc-application", environmentVariables)
// create kubernetes resources
this.deploymentManager.deploy(ucDeployment)
this.serviceManager.deploy(ucService)
this.startWorkloadGenerator("uc1", load, "uc1")
}
override fun stop() {
this.workloadGeneratorStateCleaner.deleteAll()
this.topicManager.deleteTopics(this.config.kafkaTopics)
this.deploymentManager.delete(this.ucDeployment)
this.serviceManager.delete(this.ucService)
}
override fun startWorkloadGenerator(wg: String, load: LoadDimension, ucId: String) {
override fun startWorkloadGenerator(load: LoadDimension) {
this.deploymentManager.setImageName(ucDeployment, "workload-generator", this.config.wgImageURL)
val wgDeployment = this.yamlLoader.loadDeployment(this.config.wgDeploymentPath)
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("NUM_SENSORS", load.get().toString())
// TODO ("calculate number of required instances")
val requiredInstances: Int = 1
val environmentVariables: MutableMap<String, String> = mutableMapOf()
environmentVariables.put("NUM_SENSORS", load.get().toString())
environmentVariables.put("NUM_INSTANCES", requiredInstances.toString())
wgDeployment?.let { this.deploymentManager.setWorkloadEnv(it, "workload-generator", environmentVariables) }
this.deploymentManager.setWorkloadEnv(wgDeployment, "workload-generator", environmentVariables)
}
data class UC1BenchmarkConfig(
......@@ -84,8 +83,9 @@ class UC1Benchmark(config: UC1BenchmarkConfig) : Benchmark(config) {
val kafkaReplication: Short,
val kafkaPartition: Int,
val ucDeploymentPath: String,
val ucDeploymentReplicas: String,
val ucServicePath: String,
val wgDeploymentPath: String
val wgDeploymentPath: String,
val ucImageURL: String,
val wgImageURL: String
) {}
}
\ No newline at end of file
package theodolite.k8s
import mu.KotlinLogging
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.ZooKeeper
private val logger = KotlinLogging.logger {}
class WorkloadGeneratorStateCleaner(ip: String) {
val path = "/workload-generation"
val sessionTimeout = 60
......@@ -16,7 +20,7 @@ class WorkloadGeneratorStateCleaner(ip: String) {
val watcher: Watcher = ZookeperWatcher() // defined below
zookeeperClient = ZooKeeper(ip, sessionTimeout, watcher)
} catch (e: Exception) {
System.out.println(e.toString())
logger.error {e.toString()}
}
}
......@@ -28,7 +32,7 @@ class WorkloadGeneratorStateCleaner(ip: String) {
try {
zookeeperClient.delete(path, -1)
} catch (ex: Exception) {
System.out.println(ex.toString())
logger.error {ex.toString()}
}
try {
......@@ -42,15 +46,15 @@ class WorkloadGeneratorStateCleaner(ip: String) {
deleted = true
}
is InterruptedException -> {
System.out.println(ex.toString())
logger.error {ex.toString()}
}
}
}
Thread.sleep(retryTime)
System.out.println("ZooKeeper reset was not successful. Retrying in 5s")
logger.info {"ZooKeeper reset was not successful. Retrying in 5s"}
}
System.out.println("ZooKeeper reset was successful")
logger.info {"ZooKeeper reset was successful"}
}
private class ZookeperWatcher : Watcher {
......
......@@ -5,5 +5,5 @@ import theodolite.util.LoadDimension
import theodolite.util.Resource
abstract class RestrictionStrategy(val results: Results) {
public abstract fun next(load: LoadDimension, resources: List<Resource>): List<Resource>;
public abstract fun next(load: LoadDimension, resources: List<Resource>): List<Resource>
}
\ No newline at end of file
......@@ -7,7 +7,7 @@ import theodolite.util.Results
import java.lang.IllegalArgumentException
class BinarySearch(benchmarkExecutor: BenchmarkExecutor, results: Results) : SearchStrategy(benchmarkExecutor, results) {
override fun findSuitableResources(load: LoadDimension, resources: List<Resource>): Resource? {
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
val result = search(load, resources, 0, resources.size - 1)
if( result == -1 ) {
return null;
......@@ -19,22 +19,23 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor, results: Results) : Sea
if (lower > upper) {
throw IllegalArgumentException()
}
// special case: length == 1 or 2
if (lower == upper) {
if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower;
if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower
else {
if (lower + 1 == resources.size) return - 1
return lower + 1;
return lower + 1
}
} else {
// (true, true), (false, true), (false, false) // (false, false, false, true, false, true, false, true)
// apply binary search for a list with length > 2 and adjust upper and lower depending on the result for `resources[mid]`
val mid = (upper + lower) / 2
if (this.benchmarkExecutor.runExperiment(load, resources[mid])) {
if (mid == lower) {
return lower
}
return search(load, resources, lower, mid - 1 );
return search(load, resources, lower, mid - 1 )
} else {
return search(load, resources, mid + 1 , upper);
return search(load, resources, mid + 1 , upper)
}
}
}
......
......@@ -8,11 +8,11 @@ import theodolite.util.Results
class CompositeStrategy(benchmarkExecutor: BenchmarkExecutor, val searchStrategy: SearchStrategy, val restrictionStrategies: Set<RestrictionStrategy>, results: Results) : SearchStrategy(benchmarkExecutor, results) {
override fun findSuitableResources(load: LoadDimension, resources: List<Resource>): Resource? {
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
var restrictedResources = resources.toList()
for (strategy in this.restrictionStrategies) {
restrictedResources = restrictedResources.intersect(strategy.next(load, resources)).toList()
}
return this.searchStrategy.findSuitableResources(load, restrictedResources)
return this.searchStrategy.findSuitableResource(load, restrictedResources)
}
}
\ No newline at end of file
......@@ -7,7 +7,7 @@ import theodolite.util.Results
class LinearSearch(benchmarkExecutor: BenchmarkExecutor, results: Results) : SearchStrategy(benchmarkExecutor, results) {
override fun findSuitableResources(load: LoadDimension, resources: List<Resource>): Resource? {
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
for (res in resources) {
if (this.benchmarkExecutor.runExperiment(load, res)) return res
}
......
......@@ -6,5 +6,5 @@ import theodolite.util.Resource
import theodolite.util.Results
abstract class SearchStrategy(val benchmarkExecutor: BenchmarkExecutor, val results: Results) {
abstract fun findSuitableResources(load: LoadDimension, resources: List<Resource>): Resource?;
abstract fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource?;
}
\ No newline at end of file
......@@ -4,10 +4,17 @@ import theodolite.k8s.UC1Benchmark
// todo: needs cluster and resource config
abstract class Benchmark(val config: UC1Benchmark.UC1BenchmarkConfig) {
abstract fun start(load: LoadDimension, resources: Resource);
fun start(load: LoadDimension, resources: Resource) {
this.initializeClusterEnvironment()
this.startSUT(resources)
this.startWorkloadGenerator(load)
}
abstract fun initializeClusterEnvironment();
abstract fun clearClusterEnvironment();
abstract fun stop();
abstract fun startSUT(resources: Resource);
abstract fun startWorkloadGenerator(wg: String, load: LoadDimension, ucId: String);
abstract fun startWorkloadGenerator(load: LoadDimension);
}
\ No newline at end of file
package theodolite.util
class KafkaBenchmark(config: Map<String, Any>): Benchmark(config) {
override fun start(load: LoadDimension, resources: Resource) {
TODO("Not yet implemented")
}
override fun stop() {
TODO("Not yet implemented")
}
override fun startWorkloadGenerator(wg: String, load: LoadDimension, ucId: String) {
TODO("Not yet implemented")
}
}
\ No newline at end of file
package theodolite.util
class TestBenchmark: Benchmark(config = emptyMap()) {
override fun start() {
import theodolite.k8s.UC1Benchmark
class TestBenchmark: Benchmark(UC1Benchmark.UC1BenchmarkConfig(
zookeeperConnectionString = "",
kafkaIPConnectionString = "",
schemaRegistryConnectionString = "",
kafkaTopics = emptyList(),
kafkaReplication = 0,
kafkaPartition = 0,
ucServicePath = "",
ucDeploymentPath = "",
wgDeploymentPath = "",
ucImageURL = "",
wgImageURL = ""
)){
override fun initializeClusterEnvironment() {
TODO("Not yet implemented")
}
override fun clearClusterEnvironment() {
TODO("Not yet implemented")
}
override fun stop() {
override fun startSUT(resources: Resource) {
TODO("Not yet implemented")
}
override fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) {
override fun startWorkloadGenerator(load: LoadDimension) {
TODO("Not yet implemented")
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment