diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt new file mode 100644 index 0000000000000000000000000000000000000000..9d093e4851e5c43d29a3fea3057ccf01be612e63 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/LeaderElector.kt @@ -0,0 +1,43 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock +import mu.KotlinLogging +import java.time.Duration +import java.util.* +import kotlin.reflect.KFunction0 + +private val logger = KotlinLogging.logger {} + +class LeaderElector( + val client: NamespacedKubernetesClient, + val name: String + ) { + + fun getLeadership(leader: KFunction0<Unit>) { + val lockIdentity: String = UUID.randomUUID().toString() + DefaultKubernetesClient().use { kc -> + kc.leaderElector() + .withConfig( + LeaderElectionConfigBuilder() + .withName("Theodolite") + .withLeaseDuration(Duration.ofSeconds(15L)) + .withLock(LeaseLock(client.namespace, name, lockIdentity)) + .withRenewDeadline(Duration.ofSeconds(10L)) + .withRetryPeriod(Duration.ofSeconds(2L)) + .withLeaderCallbacks(LeaderCallbacks( + { Thread{leader()}.start() }, + { logger.info { "STOPPED LEADERSHIP" } } + ) { newLeader: String? -> + logger.info { "New leader elected $newLeader" } + }) + .build() + ) + .build().run() + } + } + +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt index 49fbf54eecf2a178ae8a0b38bedee1c597a80e1f..0d55b0c1c1c76dba226d34554e0d96a3df77c1c3 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -1,6 +1,7 @@ package theodolite.execution.operator import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.Resource import io.fabric8.kubernetes.internal.KubernetesDeserializer @@ -27,14 +28,22 @@ private val logger = KotlinLogging.logger {} */ class TheodoliteOperator { private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE + val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) + + + fun start() { + LeaderElector( + client = client, + name = "theodolite-operator" + ) + .getLeadership(::startOperator) + } /** * Start the operator. */ - fun start() { - // FIXME("Remove all benchmark state handling") + private fun startOperator() { logger.info { "Using $namespace as namespace." } - val client = DefaultKubernetesClient().inNamespace(namespace) client.use { KubernetesDeserializer.registerCustomKind( "$GROUP/$API_VERSION",