Skip to content
Snippets Groups Projects
Commit a0fb76e0 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add create and delete topics

parent bd37a19f
No related branches found
No related tags found
6 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!81Feature/126 kafka communication,!79Feature/127 zookeeper communication,!78Resolve "Implement Quarkus/Kotlin protype"
......@@ -8,7 +8,12 @@ object Main {
fun main(args: Array<String>) {
println("Running main method")
val run = RunUc()
val testtopic = mapOf<String,Int>("test" to 1)
run.createTopics(testtopic, 1.toShort())
run.deleteTopics(listOf("test"))
//Quarkus.run()
}
}
package theodolite
class RunUc {
fun waitExecution(executionMinutes: Int) {
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
class RunUc (){
val bootstrapServer = "my-confluent-cp-zookeeper:2181"
val ip = "172.18.0.9:5556"
val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "172.18.0.9:5556")
lateinit var kafkaAdmin: AdminClient
init {
try {
kafkaAdmin = AdminClient.create(props)
}
catch (e: Exception) {
System.out.println(e.toString())
}
}
fun waitExecution(executionMinutes: Int) {
var milliToMinutes = 60000
System.out.println("Wait while executing")
for (i in 1.rangeTo(executionMinutes)) {
......@@ -12,12 +32,26 @@ class RunUc {
System.out.println("Execution finished")
}
fun create_topics(topics: List<Pair<String,String>>){
fun createTopics(topics: Map<String, Int>,replicationfactor: Short) {
val newTopics = mutableSetOf<NewTopic>()
for (i in topics) {
val tops = NewTopic(i.key,i.value,replicationfactor)
newTopics.add(tops)
}
kafkaAdmin.createTopics(newTopics)
System.out.println("Topics created")
}
fun delete_topics(topics: List<Pair<String,String>>){
fun deleteTopics(topics: List<String>) {
var result = kafkaAdmin.deleteTopics(topics)
System.out.println(result.values().toString())
}
fun getTopics(): ListTopicsResult? {
return kafkaAdmin.listTopics()
}
fun reset_zookeeper(){
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment