diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt new file mode 100644 index 0000000000000000000000000000000000000000..b680072787989093ab7cbc9a08bec31ed9968d44 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt @@ -0,0 +1,9 @@ +package theodolite + +class Benchmark { + fun start() {} + + fun stop() {} + + fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) { } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt index cf31b75ed26b8f214dd455d6a05e77b8599d6f8e..1790cfe81e96b5de098a7ca3f53d955886cb9827 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt @@ -1,82 +1,17 @@ package theodolite -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.ListTopicsResult -import org.apache.kafka.clients.admin.NewTopic -import org.apache.zookeeper.Watcher -import org.apache.zookeeper.ZooKeeper -import org.apache.zookeeper.WatchedEvent +class BenchmarkExecutor(benchmark: Benchmark) { + val benchmark: Benchmark = benchmark - - - -class RunUc (){ - val bootstrapServer = "my-confluent-cp-zookeeper:2181" - val ip = "172.18.0.9:5556" - val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "172.18.0.9:5556") - lateinit var kafkaAdmin: AdminClient - - init { - try { - kafkaAdmin = AdminClient.create(props) - } - catch (e: Exception) { - System.out.println(e.toString()) - - } - } fun waitExecution(executionMinutes: Int) { - var milliToMinutes = 60000 + val milliToMinutes = 60000 System.out.println("Wait while executing") for (i in 1.rangeTo(executionMinutes)) { - Thread.sleep((milliToMinutes*i).toLong()); - System.out.println("Executed: "+i.toString()+" minutes") + Thread.sleep((milliToMinutes * i).toLong()) + System.out.println("Executed: $i minutes") } System.out.println("Execution finished") } - - fun createTopics(topics: Map<String, Int>,replicationfactor: Short) { - - val newTopics = mutableSetOf<NewTopic>() - for (i in topics) { - val tops = NewTopic(i.key,i.value,replicationfactor) - newTopics.add(tops) - } - kafkaAdmin.createTopics(newTopics) - System.out.println("Topics created") - } - - fun deleteTopics(topics: List<String>) { - - var result = kafkaAdmin.deleteTopics(topics) - System.out.println(result.values().toString()) - - } - - fun getTopics(): ListTopicsResult? { - return kafkaAdmin.listTopics() - - } - - fun resetZookeeper(){ - val watcher :Watcher = startWatcher() - - val zookeeperclient = ZooKeeper(ip,60, watcher) - zookeeperclient.delete("/workload-generation", -1) - System.out.println("Deletion executed") - } - - private fun startWatcher(): Watcher { - return Watcher { event -> - System.out.println(event.toString()) - System.out.println(event.state.toString()) - } - } - - - fun start_workload_generator(wg: String, dim_value:Integer, uc_id: String){ - - } + fun runExperiment() {} } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt new file mode 100644 index 0000000000000000000000000000000000000000..a62a8d8254d740f8362ea284c27aa43a6fdee71c --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt @@ -0,0 +1,57 @@ +package theodolite + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.ListTopicsResult +import org.apache.kafka.clients.admin.NewTopic + +class TopicManager(boostrapIp: String) { + val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to boostrapIp) + lateinit var kafkaAdmin: AdminClient + + init { + try { + kafkaAdmin = AdminClient.create(props) + } catch (e : Exception) { + System.out.println(e.toString()) + } + } + + fun createTopics(topics: Map<String, Int>, replicationfactor: Short) { + + val newTopics = mutableSetOf<NewTopic>() + for (i in topics) { + val tops = NewTopic(i.key, i.value, replicationfactor) + newTopics.add(tops) + } + kafkaAdmin.createTopics(newTopics) + System.out.println("Topics created") + } + + fun createTopics(topics: List<String>, numPartitions: Int, replicationfactor: Short) { + + val newTopics = mutableSetOf<NewTopic>() + for (i in topics) { + val tops = NewTopic(i, numPartitions, replicationfactor) + newTopics.add(tops) + } + kafkaAdmin.createTopics(newTopics) + System.out.println("Creation of $topics started") + } + + fun deleteTopics(topics: List<String>) { + + val result = kafkaAdmin.deleteTopics(topics) + + try { + result.all().get() + } catch (ex:Exception) { + System.out.println(ex.toString()) + } + System.out.println("Topics deleted") + } + + fun getTopics(): ListTopicsResult? { + return kafkaAdmin.listTopics() + } +}