Skip to content
Snippets Groups Projects
Commit a474c9e4 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Merge branch 'theodolite-kotlin' of...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/theodolite into 198-update-k8s-theodolite-job
parents b2d7eb17 7d4443e7
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!122Update Theodolite Kubernetes Job,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
......@@ -6,6 +6,23 @@ plugins {
applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
run.classpath = sourceSets.main.runtimeClasspath
jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}
shadowJar {
configurations = [project.configurations.compile]
zip64 true
}
tasks.distZip.enabled = false
ext {
flinkVersion = '1.12.2'
scalaBinaryVersion = '2.12'
......@@ -48,17 +65,3 @@ dependencies {
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
run.classpath = sourceSets.main.runtimeClasspath
jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}
shadowJar {
configurations = [project.configurations.compile]
zip64 true
}
......@@ -6,6 +6,8 @@ plugins {
id 'application'
}
tasks.distZip.enabled = false
repositories {
jcenter()
maven {
......
......@@ -6,6 +6,8 @@ plugins {
id 'application'
}
tasks.distZip.enabled = false
repositories {
jcenter()
maven {
......
......@@ -14,4 +14,4 @@ spec:
targetPort: 80
protocol: TCP
- name: metrics
port: 9980
port: 5556
......@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
......@@ -16,7 +17,11 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
try {
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
} catch(e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
......@@ -25,8 +30,12 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue." }
try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
logger.info { "Delete execution ${execution.metadata.name} from queue." }
} catch(e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
......
......@@ -70,9 +70,15 @@ class TheodoliteController(
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run()
try {
if (!isUpdated.get()) {
this.executionsQueue.removeIf { e -> e.name == execution.name }
client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
}
} catch (e: Exception) {
logger.warn { "Deletion skipped." }
}
logger.info { "Execution of ${execution.name} is finally stopped." }
}
......
......@@ -2,7 +2,6 @@ package theodolite.k8s
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
......@@ -20,9 +19,15 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/
fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
kafkaAdmin.createTopics(newTopics)
val result = kafkaAdmin.createTopics(newTopics)
result.all().get()// wait for the future object
logger.info {
"Topics created finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
kafkaAdmin.close()
logger.info { "Topics created" }
}
......@@ -32,15 +37,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/
fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.deleteTopics(topics)
try {
result.all().get()
val result = kafkaAdmin.deleteTopics(topics)
result.all().get() // wait for the future object
logger.info {
"\"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
}
kafkaAdmin.close()
logger.info { "Topics removed" }
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment