From a0fb76e021900c7cb9e90003034ed406b2862e4e Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Sun, 10 Jan 2021 22:57:36 +0100 Subject: [PATCH] Add create and delete topics --- .../src/main/kotlin/theodolite/Main.kt | 5 +++ .../src/main/kotlin/theodolite/RunUc.kt | 42 +++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/Main.kt b/theodolite-quarkus/src/main/kotlin/theodolite/Main.kt index cf6a7cb5d..705d18a25 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/Main.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/Main.kt @@ -8,7 +8,12 @@ object Main { fun main(args: Array<String>) { println("Running main method") + val run = RunUc() + val testtopic = mapOf<String,Int>("test" to 1) + + run.createTopics(testtopic, 1.toShort()) + run.deleteTopics(listOf("test")) //Quarkus.run() } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/RunUc.kt b/theodolite-quarkus/src/main/kotlin/theodolite/RunUc.kt index dba618b46..c88bc17f0 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/RunUc.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/RunUc.kt @@ -1,7 +1,27 @@ package theodolite -class RunUc { - fun waitExecution(executionMinutes: Int) { +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.ListTopicsResult +import org.apache.kafka.clients.admin.NewTopic +import java.util.* + +class RunUc (){ + val bootstrapServer = "my-confluent-cp-zookeeper:2181" + val ip = "172.18.0.9:5556" + val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "172.18.0.9:5556") + lateinit var kafkaAdmin: AdminClient + + init { + try { + kafkaAdmin = AdminClient.create(props) + } + catch (e: Exception) { + System.out.println(e.toString()) + + } + } + fun waitExecution(executionMinutes: Int) { var milliToMinutes = 60000 System.out.println("Wait while executing") for (i in 1.rangeTo(executionMinutes)) { @@ -12,12 +32,26 @@ class RunUc { System.out.println("Execution finished") } - fun create_topics(topics: List<Pair<String,String>>){ + fun createTopics(topics: Map<String, Int>,replicationfactor: Short) { + val newTopics = mutableSetOf<NewTopic>() + for (i in topics) { + val tops = NewTopic(i.key,i.value,replicationfactor) + newTopics.add(tops) + } + kafkaAdmin.createTopics(newTopics) + System.out.println("Topics created") } - fun delete_topics(topics: List<Pair<String,String>>){ + fun deleteTopics(topics: List<String>) { + + var result = kafkaAdmin.deleteTopics(topics) + System.out.println(result.values().toString()) + + } + fun getTopics(): ListTopicsResult? { + return kafkaAdmin.listTopics() } fun reset_zookeeper(){ -- GitLab