diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt index ddb5bc913f58f319830062c86f7638b929fd86e0..b680072787989093ab7cbc9a08bec31ed9968d44 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/Benchmark.kt @@ -1,13 +1,9 @@ package theodolite class Benchmark { + fun start() {} + fun stop() {} - fun start(){ - - } - - fun stop(){ - - } -} \ No newline at end of file + fun startWorkloadGenerator(wg: String, dimValue: Int, ucId: String) { } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt index afa4acfc638d90ea7ea0f03db6241b4b0093eb6a..1790cfe81e96b5de098a7ca3f53d955886cb9827 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/BenchmarkExecutor.kt @@ -1,63 +1,17 @@ package theodolite -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.ListTopicsResult -import org.apache.kafka.clients.admin.NewTopic +class BenchmarkExecutor(benchmark: Benchmark) { + val benchmark: Benchmark = benchmark - -class RunUc (){ - val bootstrapServer = "my-confluent-cp-zookeeper:2181" - val ip = "172.18.0.9:5556" - val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "172.18.0.9:5556") - lateinit var kafkaAdmin: AdminClient - - init { - try { - kafkaAdmin = AdminClient.create(props) - } - catch (e: Exception) { - System.out.println(e.toString()) - - } - } fun waitExecution(executionMinutes: Int) { - var milliToMinutes = 60000 + val milliToMinutes = 60000 System.out.println("Wait while executing") for (i in 1.rangeTo(executionMinutes)) { - Thread.sleep((milliToMinutes*i).toLong()); - System.out.println("Executed: "+i.toString()+" minutes") + Thread.sleep((milliToMinutes * i).toLong()) + System.out.println("Executed: $i minutes") } System.out.println("Execution finished") } - - fun createTopics(topics: Map<String, Int>,replicationfactor: Short) { - - val newTopics = mutableSetOf<NewTopic>() - for (i in topics) { - val tops = NewTopic(i.key,i.value,replicationfactor) - newTopics.add(tops) - } - kafkaAdmin.createTopics(newTopics) - System.out.println("Topics created") - } - - fun deleteTopics(topics: List<String>) { - - var result = kafkaAdmin.deleteTopics(topics) - System.out.println(result.values().toString()) - - } - - fun getTopics(): ListTopicsResult? { - return kafkaAdmin.listTopics() - - } - - - fun start_workload_generator(wg: String, dim_value:Integer, uc_id: String){ - - } + fun runExperiment() {} } - diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt new file mode 100644 index 0000000000000000000000000000000000000000..a62a8d8254d740f8362ea284c27aa43a6fdee71c --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/TopicManager.kt @@ -0,0 +1,57 @@ +package theodolite + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.ListTopicsResult +import org.apache.kafka.clients.admin.NewTopic + +class TopicManager(boostrapIp: String) { + val props = hashMapOf<String, Any>(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to boostrapIp) + lateinit var kafkaAdmin: AdminClient + + init { + try { + kafkaAdmin = AdminClient.create(props) + } catch (e : Exception) { + System.out.println(e.toString()) + } + } + + fun createTopics(topics: Map<String, Int>, replicationfactor: Short) { + + val newTopics = mutableSetOf<NewTopic>() + for (i in topics) { + val tops = NewTopic(i.key, i.value, replicationfactor) + newTopics.add(tops) + } + kafkaAdmin.createTopics(newTopics) + System.out.println("Topics created") + } + + fun createTopics(topics: List<String>, numPartitions: Int, replicationfactor: Short) { + + val newTopics = mutableSetOf<NewTopic>() + for (i in topics) { + val tops = NewTopic(i, numPartitions, replicationfactor) + newTopics.add(tops) + } + kafkaAdmin.createTopics(newTopics) + System.out.println("Creation of $topics started") + } + + fun deleteTopics(topics: List<String>) { + + val result = kafkaAdmin.deleteTopics(topics) + + try { + result.all().get() + } catch (ex:Exception) { + System.out.println(ex.toString()) + } + System.out.println("Topics deleted") + } + + fun getTopics(): ListTopicsResult? { + return kafkaAdmin.listTopics() + } +}