diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 11485285e851fb6a59232bdff3119781b1de378c..027b9328336adcfc66299c94335a895d48a7181f 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -1,6 +1,7 @@ package theodolite.execution import io.quarkus.runtime.annotations.RegisterForReflection +import mu.KotlinLogging import theodolite.benchmark.Benchmark import theodolite.benchmark.BenchmarkExecution import theodolite.evaluation.AnalysisExecutor @@ -10,6 +11,8 @@ import theodolite.util.Resource import theodolite.util.Results import java.time.Duration +private val logger = KotlinLogging.logger {} + @RegisterForReflection class BenchmarkExecutorImpl( benchmark: Benchmark, @@ -21,8 +24,15 @@ class BenchmarkExecutorImpl( override fun runExperiment(load: LoadDimension, res: Resource): Boolean { var result = false val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) - benchmarkDeployment.setup() - this.waitAndLog() + + try { + benchmarkDeployment.setup() + this.waitAndLog() + } catch(e: Exception) { + logger.error { "Error while setup experiment." } + logger.error { "Error is: $e" } + this.run.set(false) + } if (this.run.get()) { result = diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/Main.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/Main.kt new file mode 100644 index 0000000000000000000000000000000000000000..3246d0a7930aab22d16ed94a6c0f10a9d3fde10e --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/Main.kt @@ -0,0 +1,25 @@ +package theodolite.execution + +import io.quarkus.runtime.annotations.QuarkusMain +import mu.KotlinLogging +import theodolite.execution.operator.TheodoliteOperator +import kotlin.system.exitProcess + +private val logger = KotlinLogging.logger {} + +@QuarkusMain +object Main { + + @JvmStatic + fun main(args: Array<String>) { + + val mode = System.getenv("MODE") ?: "operator" + logger.info { "Start Theodolite with mode $mode" } + + when(mode) { + "yaml-executor" -> TheodoliteYamlExecutor().start() + "operator" -> TheodoliteOperator().start() + else -> {logger.error { "MODE $mode not found" }; exitProcess(1)} + } + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt deleted file mode 100644 index e4bd60b7645766d1fda8ecb208bc8b619b1f782e..0000000000000000000000000000000000000000 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt +++ /dev/null @@ -1,145 +0,0 @@ -package theodolite.execution - -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext -import io.fabric8.kubernetes.client.informers.ResourceEventHandler -import io.fabric8.kubernetes.client.informers.SharedInformer -import mu.KotlinLogging -import theodolite.benchmark.BenchmarkExecution -import theodolite.benchmark.KubernetesBenchmark -import java.lang.Thread.sleep -import java.util.* -import kotlin.collections.HashMap -import kotlin.collections.set - -private val logger = KotlinLogging.logger {} - - -class TheodoliteController( - val client: NamespacedKubernetesClient, - val informerBenchmarkExecution: SharedInformer<BenchmarkExecution>, - val informerBenchmarkType: SharedInformer<KubernetesBenchmark>, - val executionContext: CustomResourceDefinitionContext -) { - lateinit var executor: TheodoliteExecutor - val executionsQueue: Deque<BenchmarkExecution> = LinkedList<BenchmarkExecution>() - val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap() - var isUpdated = false - - /** - * Adds the EventHandler to kubernetes - */ - fun create() { - informerBenchmarkExecution.addEventHandler(object : ResourceEventHandler<BenchmarkExecution> { - override fun onAdd(execution: BenchmarkExecution) { - execution.name = execution.metadata.name - logger.info { "Add new execution ${execution.metadata.name} to queue" } - executionsQueue.add(execution) - - } - - override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { - logger.info { "Add updated execution to queue" } - - newExecution.name = newExecution.metadata.name - executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } - executionsQueue.addFirst(newExecution) - - if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) { - isUpdated = true - executor.executor.run.compareAndSet(true, false) - } - } - - override fun onDelete(execution: BenchmarkExecution, b: Boolean) { - logger.info { "Delete execution ${execution.metadata.name} from queue" } - executionsQueue.removeIf { e -> e.name == execution.metadata.name } - if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) { - isUpdated = true - executor.executor.run.compareAndSet(true, false) - logger.info { "Current benchmark stopped" } - } - } - }) - - informerBenchmarkType.addEventHandler(object : ResourceEventHandler<KubernetesBenchmark> { - override fun onAdd(benchmark: KubernetesBenchmark) { - benchmark.name = benchmark.metadata.name - logger.info { "Add new benchmark ${benchmark.name}" } - benchmarks[benchmark.name] = benchmark - } - - override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) { - logger.info { "Update benchmark ${newBenchmark.metadata.name}" } - newBenchmark.name = newBenchmark.metadata.name - if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) { - isUpdated = true - executor.executor.run.compareAndSet(true, false) - } else { - onAdd(newBenchmark) - } - } - - override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) { - logger.info { "Delete benchmark ${benchmark.metadata.name}" } - benchmarks.remove(benchmark.metadata.name) - if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) { - isUpdated = true - executor.executor.run.compareAndSet(true, false) - logger.info { "Current benchmark stopped" } - } - } - }) - } - - fun run() { - while (true) { - try { - reconcile() - logger.info { "Theodolite is waiting for new matching benchmark and execution" } - logger.info { "Currently available executions: " } - executionsQueue.forEach { - logger.info { "${it.name} : waiting for : ${it.benchmark}" } - } - logger.info { "Currently available benchmarks: " } - benchmarks.forEach { - logger.info { it.key } - } - sleep(2000) - } catch (e: InterruptedException) { - logger.error { "Execution interrupted with error: $e" } - } - } - } - - @Synchronized - private fun reconcile() { - while (executionsQueue.isNotEmpty()) { - - val execution = executionsQueue.peek() - - val benchmark = benchmarks[execution.benchmark] - - if (benchmark == null) { - logger.debug { "No benchmark found for execution ${execution.name}" } - sleep(1000) - } else { - runExecution(execution, benchmark) - } - } - } - - @Synchronized - fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { - isUpdated = false - logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" } - executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) - executor.run() - - if (!isUpdated) { - client.customResource(executionContext).delete(client.namespace, execution.metadata.name) - } - - logger.info { "Execution of ${execution.name} is finally stopped" } - } -} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt deleted file mode 100644 index 478fdce0cd9378488d46be0baa75893131a02cbb..0000000000000000000000000000000000000000 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt +++ /dev/null @@ -1,76 +0,0 @@ -package theodolite.execution - -import io.fabric8.kubernetes.client.DefaultKubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext -import io.fabric8.kubernetes.internal.KubernetesDeserializer -import io.quarkus.runtime.annotations.QuarkusMain -import mu.KotlinLogging -import theodolite.benchmark.BenchmarkExecution -import theodolite.benchmark.BenchmarkExecutionList -import theodolite.benchmark.KubernetesBenchmark -import theodolite.benchmark.KubernetesBenchmarkList - - -private var DEFAULT_NAMESPACE = "default" -private val logger = KotlinLogging.logger {} - -@QuarkusMain(name = "TheodoliteOperator") -object TheodoliteOperator { - @JvmStatic - fun main(args: Array<String>) { - - val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE - logger.info { "Using $namespace as namespace." } - - - val client = DefaultKubernetesClient().inNamespace(namespace) - - KubernetesDeserializer.registerCustomKind( - "theodolite.com/v1alpha1", - "execution", - BenchmarkExecution::class.java - ) - - KubernetesDeserializer.registerCustomKind( - "theodolite.com/v1alpha1", - "benchmark", - KubernetesBenchmark::class.java - ) - - val executionContext = CustomResourceDefinitionContext.Builder() - .withVersion("v1alpha1") - .withScope("Namespaced") - .withGroup("theodolite.com") - .withPlural("executions") - .build() - - val typeContext = CustomResourceDefinitionContext.Builder() - .withVersion("v1alpha1") - .withScope("Namespaced") - .withGroup("theodolite.com") - .withPlural("benchmarks") - .build() - - val informerFactory = client.informers() - - val informerBenchmarkExecution = informerFactory.sharedIndexInformerForCustomResource( - executionContext, BenchmarkExecution::class.java, - BenchmarkExecutionList::class.java, 10 * 60 * 1000.toLong() - ) - - val informerBenchmarkType = informerFactory.sharedIndexInformerForCustomResource( - typeContext, KubernetesBenchmark::class.java, - KubernetesBenchmarkList::class.java, 10 * 60 * 1000.toLong() - ) - val controller = TheodoliteController( - client = client, - informerBenchmarkExecution = informerBenchmarkExecution, - informerBenchmarkType = informerBenchmarkType, - executionContext = executionContext - ) - - controller.create() - informerFactory.startAllRegisteredInformers() - controller.run() - } -} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt index 67cad64e4cb82d2f68fbcb2cfab715f4a564d245..f35e43a8f95d5fc20a9000af7b55d95881694804 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt @@ -1,6 +1,5 @@ package theodolite.execution -import io.quarkus.runtime.annotations.QuarkusMain import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark @@ -10,10 +9,10 @@ import kotlin.system.exitProcess private val logger = KotlinLogging.logger {} -@QuarkusMain(name = "TheodoliteYamlExecutor") -object TheodoliteYamlExecutor { - @JvmStatic - fun main(args: Array<String>) { +class TheodoliteYamlExecutor { + private val parser = YamlParser() + + fun start() { logger.info { "Theodolite started" } val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml" @@ -26,7 +25,6 @@ object TheodoliteYamlExecutor { // load the BenchmarkExecution and the BenchmarkType - val parser = YamlParser() val benchmarkExecution = parser.parse(path = executionPath, E = BenchmarkExecution::class.java)!! val benchmark = diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/BenchmarkEventHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/BenchmarkEventHandler.kt new file mode 100644 index 0000000000000000000000000000000000000000..2b4a784315c2961c5782264c44f2c7e4e8f0d2e8 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/BenchmarkEventHandler.kt @@ -0,0 +1,35 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler +import mu.KotlinLogging +import theodolite.benchmark.KubernetesBenchmark +private val logger = KotlinLogging.logger {} + +class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> { + override fun onAdd(benchmark: KubernetesBenchmark) { + benchmark.name = benchmark.metadata.name + logger.info { "Add new benchmark ${benchmark.name}." } + this.controller.benchmarks[benchmark.name] = benchmark + } + + override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) { + logger.info { "Update benchmark ${newBenchmark.metadata.name}." } + newBenchmark.name = newBenchmark.metadata.name + if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) { + this.controller.isUpdated.set(true) + this.controller.executor.executor.run.compareAndSet(true, false) + } else { + onAdd(newBenchmark) + } + } + + override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) { + logger.info { "Delete benchmark ${benchmark.metadata.name}." } + this.controller.benchmarks.remove(benchmark.metadata.name) + if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) { + this.controller.isUpdated.set(true) + this.controller.executor.executor.run.compareAndSet(true, false) + logger.info { "Current benchmark stopped." } + } + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt new file mode 100644 index 0000000000000000000000000000000000000000..1752ac112ea84ea179e238f7ab8d808779014d1b --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt @@ -0,0 +1,36 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler +import mu.KotlinLogging +import theodolite.benchmark.BenchmarkExecution + +private val logger = KotlinLogging.logger {} + +class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> { + override fun onAdd(execution: BenchmarkExecution) { + execution.name = execution.metadata.name + logger.info { "Add new execution ${execution.metadata.name} to queue." } + this.controller.executionsQueue.add(execution) + } + + override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { + logger.info { "Add updated execution to queue." } + newExecution.name = newExecution.metadata.name + this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } + this.controller.executionsQueue.addFirst(newExecution) + if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) { + this.controller.isUpdated.set(true) + this.controller.executor.executor.run.compareAndSet(true, false) + } + } + + override fun onDelete(execution: BenchmarkExecution, b: Boolean) { + logger.info { "Delete execution ${execution.metadata.name} from queue." } + this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } + if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) { + this.controller.isUpdated.set(true) + this.controller.executor.executor.run.compareAndSet(true, false) + logger.info { "Current benchmark stopped." } + } + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/K8sContextFactory.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/K8sContextFactory.kt new file mode 100644 index 0000000000000000000000000000000000000000..94adf61a435df743aea168c5451b2f3faa24ca17 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/K8sContextFactory.kt @@ -0,0 +1,15 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext + +class K8sContextFactory { + + fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext{ + return CustomResourceDefinitionContext.Builder() + .withVersion(api) + .withScope(scope) + .withGroup(group) + .withPlural(plural) + .build() + } +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt new file mode 100644 index 0000000000000000000000000000000000000000..c966739a4744f448f60223738d379753b098c060 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -0,0 +1,77 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext +import mu.KotlinLogging +import theodolite.benchmark.BenchmarkExecution +import theodolite.benchmark.KubernetesBenchmark +import theodolite.execution.TheodoliteExecutor +import java.lang.Thread.sleep +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicBoolean + +private val logger = KotlinLogging.logger {} + +class TheodoliteController( + val client: NamespacedKubernetesClient, + val executionContext: CustomResourceDefinitionContext +) { + lateinit var executor: TheodoliteExecutor + val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque() + val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap() + var isUpdated = AtomicBoolean(false) + + fun run() { + while (true) { + try { + reconcile() + logger.info { "Theodolite is waiting for new matching benchmark and execution." } + logger.info { "Currently available executions: " } + executionsQueue.forEach { + logger.info { "${it.name} : waiting for : ${it.benchmark}" } + } + logger.info { "Currently available benchmarks: " } + benchmarks.forEach { + logger.info { it.key } + } + sleep(2000) + } catch (e: InterruptedException) { + logger.error { "Execution interrupted with error: $e." } + } + } + } + + @Synchronized + private fun reconcile() { + while (executionsQueue.isNotEmpty()) { + val execution = executionsQueue.peek() + val benchmark = benchmarks[execution.benchmark] + + if (benchmark == null) { + logger.debug { "No benchmark found for execution ${execution.name}." } + sleep(1000) + } else { + runExecution(execution, benchmark) + } + } + } + + @Synchronized + fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { + isUpdated.set(false) + logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." } + executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) + executor.run() + + if (!isUpdated.get()) { + client.customResource(executionContext).delete(client.namespace, execution.metadata.name) + } + logger.info { "Execution of ${execution.name} is finally stopped." } + } + + @Synchronized + fun isInitialized(): Boolean { + return ::executor.isInitialized + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt new file mode 100644 index 0000000000000000000000000000000000000000..33b71c8fa247511c371b7a154071634db7092660 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -0,0 +1,64 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.internal.KubernetesDeserializer +import mu.KotlinLogging +import theodolite.benchmark.BenchmarkExecution +import theodolite.benchmark.BenchmarkExecutionList +import theodolite.benchmark.KubernetesBenchmark +import theodolite.benchmark.KubernetesBenchmarkList + + +private const val DEFAULT_NAMESPACE = "default" +private const val SCOPE = "Namespaced" +private const val EXECUTION_SINGULAR = "execution" +private const val EXECUTION_PLURAL = "executions" +private const val BENCHMARK_SINGULAR = "benchmark" +private const val BENCHMARK_PLURAL = "benchmarks" +private const val API_VERSION = "v1alpha1" +private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong() +private const val GROUP = "theodolite.com" +private val logger = KotlinLogging.logger {} + +class TheodoliteOperator { + private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE + + fun start() { + logger.info { "Using $namespace as namespace." } + val client = DefaultKubernetesClient().inNamespace(namespace) + + KubernetesDeserializer.registerCustomKind( + "$GROUP/$API_VERSION", + EXECUTION_SINGULAR, + BenchmarkExecution::class.java + ) + + KubernetesDeserializer.registerCustomKind( + "$GROUP/$API_VERSION", + BENCHMARK_SINGULAR, + KubernetesBenchmark::class.java + ) + + val contextFactory = K8sContextFactory() + val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL) + val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL) + + val controller = TheodoliteController(client = client, executionContext = executionContext) + + val informerFactory = client.informers() + val informerExecution = informerFactory.sharedIndexInformerForCustomResource( + executionContext, BenchmarkExecution::class.java, + BenchmarkExecutionList::class.java, RESYNC_PERIOD + ) + val informerBenchmark = informerFactory.sharedIndexInformerForCustomResource( + benchmarkContext, KubernetesBenchmark::class.java, + KubernetesBenchmarkList::class.java, RESYNC_PERIOD + ) + + informerExecution.addEventHandler(ExecutionHandler(controller)) + informerBenchmark.addEventHandler(BenchmarkEventHandler(controller)) + informerFactory.startAllRegisteredInformers() + + controller.run() + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 1336f57517ef74d8c781cc3b51bf130dbf8d99c5..390974cd247645197ebe6044bf785710164155aa 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -12,52 +12,35 @@ private val logger = KotlinLogging.logger {} * Manages the topics related tasks * @param kafkaConfig Kafka Configuration as HashMap */ -class TopicManager(kafkaConfig: HashMap<String, Any>) { - private lateinit var kafkaAdmin: AdminClient - - init { - try { - kafkaAdmin = AdminClient.create(kafkaConfig) - } catch (e: Exception) { - logger.error { e.toString() } - } - } +class TopicManager(private val kafkaConfig: HashMap<String, Any>) { /** * Creates topics. * @param newTopics List of all Topic which should be created */ fun createTopics(newTopics: Collection<NewTopic>) { + var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) kafkaAdmin.createTopics(newTopics) + kafkaAdmin.close() logger.info { "Topics created" } } - fun createTopics(topics: List<String>, numPartitions: Int, replicationFactor: Short) { - val newTopics = mutableSetOf<NewTopic>() - for (i in topics) { - val tops = NewTopic(i, numPartitions, replicationFactor) - newTopics.add(tops) - } - kafkaAdmin.createTopics(newTopics) - logger.info { "Creation of $topics started" } - } /** * Removes topics. * @param topics */ fun removeTopics(topics: List<String>) { + var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val result = kafkaAdmin.deleteTopics(topics) try { result.all().get() - } catch (ex: Exception) { - logger.error { ex.toString() } + } catch (e: Exception) { + logger.error { "Error while removing topics: $e" } + logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } } + kafkaAdmin.close() logger.info { "Topics removed" } } - - fun getTopics(): ListTopicsResult? { - return kafkaAdmin.listTopics() - } } diff --git a/theodolite-quarkus/src/main/resources/application.properties b/theodolite-quarkus/src/main/resources/application.properties index e85374958791e1dc22c37b6496e4c9ea3f2f0654..d5ff26fd074ec74c02a25bc4dd0dd3734433401b 100644 --- a/theodolite-quarkus/src/main/resources/application.properties +++ b/theodolite-quarkus/src/main/resources/application.properties @@ -1,4 +1,3 @@ -quarkus.package.main-class=TheodoliteOperator quarkus.native.additional-build-args=\ --initialize-at-run-time=io.fabric8.kubernetes.client.internal.CertUtils,\ --report-unsupported-elements-at-runtime \ No newline at end of file diff --git a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml index d5fc040b330b1d8198d0a11440f9621b73125d20..9fc16f92e303f05a449f7e8b83600c3b299f215d 100644 --- a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml +++ b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml @@ -22,7 +22,7 @@ loadTypes: container: "workload-generator" variableName: "NUM_SENSORS" kafkaConfig: - bootstrapServer: "my-confluent-cp-kafka:9092" + bootstrapServer: "localhost:31290" topics: - name: "input" numPartitions: 40