diff --git a/helm/templates/kafka/kafka-cluster.yaml b/helm/templates/kafka/kafka-cluster.yaml index 1ff89396513a134e553bc4b97f771822f52ac2ed..f1a58077a78865c624706531b58c3150feeb83ae 100644 --- a/helm/templates/kafka/kafka-cluster.yaml +++ b/helm/templates/kafka/kafka-cluster.yaml @@ -39,6 +39,11 @@ spec: nodeSelectorTerms: {{- toYaml . | nindent 16 }} {{- end}} + {{- with .Values.strimzi.kafka.resources}} + resources: + {{- toYaml . | nindent 6 }} + {{- end}} + zookeeper: {{- with .Values.strimzi.zookeeper.replicas }} diff --git a/helm/values.yaml b/helm/values.yaml index 765f8e4e6bd0a0f9d59dc812d4b7a01d134e10b0..e3e2143a2694a58be7fb4f48efe0723fc05fb0f7 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -174,6 +174,7 @@ strimzi: "-Xmx": "512M" "-Xms": "512M" nodeSelectorTerms: [] + resources: {} zookeeper: replicas: 3 diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java index 77706d824808132eaa7212194de0d69c346e4eba..f740c3696878516f29d0e06ba879cd23010a157b 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java @@ -84,8 +84,9 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< final String json = this.gson.toJson(message); final HttpRequest request = HttpRequest.newBuilder() .uri(this.uri) - .timeout(this.connectionTimeout) .POST(HttpRequest.BodyPublishers.ofString(json)) + .header("Content-Type", "application/json") + .timeout(this.connectionTimeout) .build(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); // final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java index 1f35d592ed9b2b1507eb5c30090d392d37ed7c1e..d95d9b3343835f8348af15c3d00c34ef807d4501 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java @@ -18,7 +18,7 @@ import titan.ccp.model.records.ActivePowerRecord; public class PipelineFactory extends AbstractPipelineFactory { public static final String SINK_TYPE_KEY = "sink.type"; - + private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); public PipelineFactory(final Configuration configuration) { @@ -52,7 +52,8 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass( ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.SCHEMA$)); + // AvroCoder.of(ActivePowerRecord.SCHEMA$)); + AvroCoder.of(ActivePowerRecord.class, false)); } public static Function<Configuration, AbstractPipelineFactory> factory() { diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java index 6de0b8f956c94af36cd70cf44ab691ff97e11ae9..375b2a6cba5256e0644b6beaf26d41e010089250 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java @@ -70,8 +70,10 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void registerCoders(final CoderRegistry registry) { - registry.registerCoderForClass(ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.SCHEMA$)); + registry.registerCoderForClass( + ActivePowerRecord.class, + // AvroCoder.of(ActivePowerRecord.SCHEMA$)); + AvroCoder.of(ActivePowerRecord.class, false)); registry.registerCoderForClass(StatsAggregation.class, SerializableCoder.of(StatsAggregation.class)); registry.registerCoderForClass(StatsAccumulator.class, diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index de960d3d8466f9f420f002667df04d8a2fc64873..9c766e41254555647dd7ef1eed0417613b7c1629 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -91,7 +91,8 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass( ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.SCHEMA$)); + // AvroCoder.of(ActivePowerRecord.SCHEMA$)); + AvroCoder.of(ActivePowerRecord.class, false)); registry.registerCoderForClass( HourOfDayKey.class, new HourOfDayKeyCoder()); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index a71c24eda5385b10a73b9eb65a83bba8363dd3e7..42d12d82026df0682f771b0cec5c1705ead83b2e 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -223,7 +223,8 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass( ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.class)); + // AvroCoder.of(ActivePowerRecord.SCHEMA$)); + AvroCoder.of(ActivePowerRecord.class, false)); registry.registerCoderForClass( AggregatedActivePowerRecord.class, new AggregatedActivePowerRecordCoder()); diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt index 35efebdc0fb2a3748660cb76cdd5499b4ca5f622..8bd16d04d6a5e5ef3f362ff7d5611bf73e367a7e 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt @@ -23,7 +23,7 @@ class Action { timeout = exec.timeoutSeconds, command = exec.command ) - if(exitCode != 0){ + if (exitCode != 0){ throw ActionCommandFailedException("Error while executing action, finished with exit code $exitCode") } } @@ -38,7 +38,7 @@ class ActionSelector { @JsonDeserialize @RegisterForReflection class PodSelector { - lateinit var matchLabels: MutableMap<String, String> + lateinit var matchLabels: Map<String, String> } @JsonDeserialize @RegisterForReflection diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt index a4345c43ac6a75667c3c3e85c8534697193e1458..9f0578f7d1456d823a29049daae6dbe886c95e2a 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt @@ -33,7 +33,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { * @return the exit code of this executed command */ fun exec( - matchLabels: MutableMap<String, String>, + matchLabels: Map<String, String>, command: Array<String>, timeout: Long = Configuration.TIMEOUT_SECONDS, container: String = "" @@ -58,7 +58,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS) if (!latchTerminationStatus) { - throw ActionCommandFailedException("Latch could not terminate within specified time") + throw ActionCommandFailedException("Timeout while running action command") } execWatch.close() } catch (e: Exception) { @@ -112,7 +112,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { * it can take a while until the status is ready and the pod can be selected. * @return the name of the pod or throws [ActionCommandFailedException] */ - fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String { + fun getPodName(matchLabels: Map<String, String>, tries: Int): String { for (i in 1..tries) { try { @@ -125,7 +125,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { throw ActionCommandFailedException("Couldn't find any pod that matches the specified labels.") } - private fun getPodName(matchLabels: MutableMap<String, String>): String { + private fun getPodName(matchLabels: Map<String, String>): String { return try { val podNames = this.client .pods() diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt b/theodolite/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt index be3e48be406b631e03ca2fd32909a442b592f259..b3cc174d2945bf13bc1cc29d4e60d8c9bfbaf7eb 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt @@ -38,7 +38,7 @@ class AnalysisExecutor( try { val ioHandler = IOHandler() - val resultsFolder: String = ioHandler.getResultFolderURL() + val resultsFolder = ioHandler.getResultFolderURL() val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}" val prometheusData = executionIntervals diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt index e54d79fe0f95b9f6079bd4295a74e81250b73a90..b6a1857cba513f663876f88d7a7d69ad02c0bc40 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -45,15 +45,16 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat ) while (counter < RETRIES) { + logger.info { "Request collected metrics from Prometheus for interval [$offsetStart,$offsetEnd]." } val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = TIMEOUT) if (response.statusCode != 200) { val message = response.jsonObject.toString() - logger.warn { "Could not connect to Prometheus: $message. Retrying now." } + logger.warn { "Could not connect to Prometheus: $message. Retry $counter/$RETRIES." } counter++ } else { val values = parseValues(response) if (values.data?.result.isNullOrEmpty()) { - throw NoSuchFieldException("Empty query result: $values between $start and $end for query $query.") + throw NoSuchFieldException("Empty query result: $values between for query '$query' in interval [$offsetStart,$offsetEnd] .") } return parseValues(response) } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt index 6dcfb582655ff9295aedd63d8c30cbac7daae2b3..c20b2ba87e386dc7c0a14245e03bedfb067720e6 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt @@ -190,7 +190,7 @@ class BenchmarkStateChecker( } } -private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>): Boolean { +private fun <K, V> Map<K, V>.containsMatchLabels(matchLabels: Map<V, V>): Boolean { for (kv in matchLabels) { if (kv.value != this[kv.key as K]) { return false diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt index 885315df6eda0d91a27567720056738b997a8ec1..e67be01ea80178b6d6bfb01b32bfd28c111addb9 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt @@ -1,13 +1,17 @@ package theodolite.execution.operator +import io.fabric8.kubernetes.client.KubernetesClientException import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.Resource +import mu.KotlinLogging import theodolite.execution.Shutdown import theodolite.k8s.K8sContextFactory import theodolite.k8s.ResourceByLabelHandler import theodolite.model.crd.* +private val logger = KotlinLogging.logger {} + class ClusterSetup( private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, @@ -75,10 +79,15 @@ class ClusterSetup( labelName = "app.kubernetes.io/created-by", labelValue = "theodolite" ) - resourceRemover.removeCR( - labelName = "app.kubernetes.io/created-by", - labelValue = "theodolite", - context = serviceMonitorContext - ) + try { + resourceRemover.removeCR( + labelName = "app.kubernetes.io/created-by", + labelValue = "theodolite", + context = serviceMonitorContext + ) + } catch (e: KubernetesClientException) { + logger.warn { "Service monitors could not be cleaned up. It may be that service monitors are not registered by the Kubernetes API."} + logger.debug { "Error is: ${e.message}" } + } } } \ No newline at end of file diff --git a/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt b/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt index 0e40fca5caf9fe721c547e09d2ba22c25860a1bf..47f0e52f45e46e3cda093ff1b9722071f22ef7e8 100644 --- a/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt +++ b/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt @@ -102,7 +102,7 @@ class ActionCommandTest { val action = Action() action.selector = ActionSelector() action.selector.pod = PodSelector() - action.selector.pod.matchLabels = mutableMapOf("app" to "pod") + action.selector.pod.matchLabels = mapOf("app" to "pod") action.exec = Command() action.exec.command = arrayOf("ls") action.exec.timeoutSeconds = 10L @@ -118,7 +118,7 @@ class ActionCommandTest { val action = Action() action.selector = ActionSelector() action.selector.pod = PodSelector() - action.selector.pod.matchLabels = mutableMapOf("app" to "pod") + action.selector.pod.matchLabels = mapOf("app" to "pod") action.exec = Command() action.exec.command = arrayOf("error-command") action.exec.timeoutSeconds = 10L