diff --git a/CITATION.cff b/CITATION.cff index 04640de442f4458b09e11ce3d2939c850f594556..b6ca4542010b83e12206fbc0d9841683b43e1d57 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -8,7 +8,7 @@ authors: given-names: Wilhelm orcid: "https://orcid.org/0000-0001-6625-4335" title: Theodolite -version: "0.6.3" +version: "0.6.4" repository-code: "https://github.com/cau-se/theodolite" license: "Apache-2.0" doi: "10.1016/j.bdr.2021.100209" diff --git a/codemeta.json b/codemeta.json index 832b570681afb143978698fd47dad5d2835c700b..948a34628ec919e2492b61e3ac9997392dc5e030 100644 --- a/codemeta.json +++ b/codemeta.json @@ -8,7 +8,7 @@ "dateModified": "2022-01-24", "downloadUrl": "https://github.com/cau-se/theodolite/releases", "name": "Theodolite", - "version": "0.6.3", + "version": "0.6.4", "description": "Theodolite is a framework for benchmarking the horizontal and vertical scalability of cloud-native applications.", "developmentStatus": "active", "relatedLink": [ diff --git a/docs/creating-a-benchmark.md b/docs/creating-a-benchmark.md index fde8ba0759407ddea8befc18e244784a9ba34c1f..b09c989e59d910fc352af9d1c5690b224e3346e6 100644 --- a/docs/creating-a-benchmark.md +++ b/docs/creating-a-benchmark.md @@ -42,7 +42,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 @@ -54,7 +54,7 @@ spec: ## System under Test (SUT), Load Generator and Infrastructure -In Thedolite, the system under test (SUT), the load generator as well as additional infrastructure (e.g., a middleware) are described by Kubernetes resources files. +In Theodolite, the system under test (SUT), the load generator as well as additional infrastructure (e.g., a middleware) are described by Kubernetes resources files. All resources defined for the SUT and the load generator are started and stopped for each SLO experiment, with SUT resources being started before the load generator. Infrastructure resources live over the entire duration of a benchmark run. They avoid time-consuming recreation of software components like middlewares, but should be used with caution to not let previous SLO experiments influence latte ones. diff --git a/docs/index.yaml b/docs/index.yaml index 509844ab0bc371d29302f90f69e769cd52a8e11b..3e0de103a78f3529d314727ed59be3dcdc333fc9 100644 --- a/docs/index.yaml +++ b/docs/index.yaml @@ -1,6 +1,41 @@ apiVersion: v1 entries: theodolite: + - apiVersion: v2 + appVersion: 0.6.4 + created: "2022-02-16T16:09:11.967649304+01:00" + dependencies: + - condition: grafana.enabled + name: grafana + repository: https://grafana.github.io/helm-charts + version: 6.17.5 + - condition: kube-prometheus-stack.enabled + name: kube-prometheus-stack + repository: https://prometheus-community.github.io/helm-charts + version: 20.0.1 + - condition: cp-helm-charts.enabled + name: cp-helm-charts + repository: https://soerenhenning.github.io/cp-helm-charts + version: 0.6.0 + - condition: kafka-lag-exporter.enabled + name: kafka-lag-exporter + repository: https://seanglover.com/kafka-lag-exporter/repo + version: 0.6.7 + description: Theodolite is a framework for benchmarking the horizontal and vertical + scalability of cloud-native applications. + digest: 10156d9917233ffa297aab093532038667d25b2babb2b2058a0a32e1dccb0cca + home: https://www.theodolite.rocks + maintainers: + - email: soeren.henning@email.uni-kiel.de + name: Sören Henning + url: https://www.se.informatik.uni-kiel.de/en/team/soeren-henning-m-sc + name: theodolite + sources: + - https://github.com/cau-se/theodolite + type: application + urls: + - https://github.com/cau-se/theodolite/releases/download/v0.6.4/theodolite-0.6.4.tgz + version: 0.6.4 - apiVersion: v2 appVersion: 0.6.3 created: "2022-01-24T13:40:40.07330713+01:00" @@ -141,6 +176,41 @@ entries: urls: - https://github.com/cau-se/theodolite/releases/download/v0.6.0/theodolite-0.6.0.tgz version: 0.6.0 + - apiVersion: v2 + appVersion: 0.5.2 + created: "2022-02-16T15:43:43.534374597+01:00" + dependencies: + - condition: grafana.enabled + name: grafana + repository: https://grafana.github.io/helm-charts + version: 6.17.5 + - condition: kube-prometheus-stack.enabled + name: kube-prometheus-stack + repository: https://prometheus-community.github.io/helm-charts + version: 12.0.0 + - condition: cp-helm-charts.enabled + name: cp-helm-charts + repository: https://soerenhenning.github.io/cp-helm-charts + version: 0.6.0 + - condition: kafka-lag-exporter.enabled + name: kafka-lag-exporter + repository: https://seanglover.com/kafka-lag-exporter/repo + version: 0.6.6 + description: Theodolite is a framework for benchmarking the scalability stream + processing engines. + digest: 72df752883d2161fdfc0e96bb90fe11f9c0ed4f71013e588ec170f2cbb178e9c + home: https://cau-se.github.io/theodolite + maintainers: + - email: soeren.henning@email.uni-kiel.de + name: Sören Henning + url: https://www.se.informatik.uni-kiel.de/en/team/soeren-henning-m-sc + name: theodolite + sources: + - https://github.com/cau-se/theodolite + type: application + urls: + - https://github.com/cau-se/theodolite/releases/download/v0.5.2/theodolite-0.5.2.tgz + version: 0.5.2 - apiVersion: v2 appVersion: 0.5.1 created: "2021-11-12T16:15:01.629937292+01:00" @@ -246,4 +316,4 @@ entries: urls: - https://github.com/cau-se/theodolite/releases/download/v0.4.0/theodolite-0.4.0.tgz version: 0.4.0 -generated: "2022-01-24T13:40:40.036786105+01:00" +generated: "2022-02-16T16:09:11.93111234+01:00" diff --git a/docs/installation.md b/docs/installation.md index a97e5ea499657cdc3c40f3c03a13c974b5a39bab..d1c7ac3d1dd68d244c556e1ade53b50330aec6ed 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -58,13 +58,13 @@ In cases, where you need to install multiple Theodolite instances, it's best to ### Installation with a release name other than `theodolite` -When using another release name than `theodolite`, make sure to adjust the Kafka Lag Exporter configuration of you `values.yaml` accordingly: +When using another release name than `theodolite`, make sure to adjust the Confluent Schema Registry configuration of you `values.yaml` accordingly: ```yaml -kafka-lag-exporter: - clusters: - - name: "<your-release-name>-cp-kafka" - bootstrapBrokers: "<your-release-name>-cp-kafka:9092" +cp-helm-charts: + cp-schema-registry: + kafka: + bootstrapServers: <your-release-name>-kafka-kafka-bootstrap:9092 ``` This seems unfortunately to be necessary as Helm does not let us inject values into dependency charts. diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md index 17845c42d47e94a5b696dee1d774890de8d6fff1..e92238e988436ded5444c4ce669dcc84e4e1a2b3 100644 --- a/docs/theodolite-benchmarks/load-generator.md +++ b/docs/theodolite-benchmarks/load-generator.md @@ -47,7 +47,7 @@ The prebuilt container images can be configured with the following environment v | `PORT` | Port used for for coordination among load generator instances. | 5701 | | `PORT_AUTO_INCREMENT` | If set to true and the specified PORT is already used, use the next higher one. Useful if multiple instances should run on the same host, without configuring each instance individually. | true | | `CLUSTER_NAME_PREFIX` | Only required if unrelated load generators form a cluster. | theodolite-load-generation | -| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http`. | `kafka` | +| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http` and `pubsub`. | `kafka` | | `KAFKA_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. See [Kafka producer config: `bootstrap.servers`](https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers) for more information. Only used if Kafka is set as `TARGET`. | `localhost:9092` | | `KAFKA_INPUT_TOPIC` | Name of the Kafka topic, which should receive the generated messages. Only used if Kafka is set as `TARGET`. | input | | `SCHEMA_REGISTRY_URL` | URL of the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry). | `http://localhost:8081` | @@ -55,6 +55,9 @@ The prebuilt container images can be configured with the following environment v | `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | +| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | +| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | +| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | | `NUM_SENSORS` | The amount of simulated sensors. | 10 | | `PERIOD_MS` | The time in milliseconds between generating two messages for the same sensor. With our Theodolite benchmarks, we apply an [open workload model](https://www.usenix.org/legacy/event/nsdi06/tech/full_papers/schroeder/schroeder.pdf) in which new messages are generated at a fixed rate, without considering the think time of the target server nor the time required for generating a message. | 1000 | | `VALUE` | The constant `valueInW` of an `ActivePowerRecord`. | 10 | @@ -64,10 +67,10 @@ Please note that there are some additional configuration options for benchmark [ ## Creating a custom load generator -To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` object and call its `run` method: +To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` populated with a default configuration, adjust it as desired, and start it by calling its `run` method: ```java -LoadGenerator loadGenerator = new LoadGenerator() +LoadGenerator loadGenerator = new LoadGenerator.fromDefaults() .setClusterConfig(clusterConfig) .setLoadDefinition(new WorkloadDefinition( new KeySpace(key_prefix, numSensors), @@ -79,9 +82,8 @@ LoadGenerator loadGenerator = new LoadGenerator() loadGenerator.run(); ``` -Alternatively, you can also start with a load generator populated with a default configuration or created from environment variables and then adjust the `LoadGenerator` as desired: +Alternatively, you can also start with a `LoadGenerator` created from environment variables and, optionally, adjust it as desired: ```java -LoadGenerator loadGeneratorFromDefaults = LoadGenerator.fromDefaults() -LoadGenerator loadGeneratorFromEnv = LoadGenerator.fromEnvironment(); +LoadGenerator loadGenerator = LoadGenerator.fromEnvironment(); ``` diff --git a/helm/Chart.yaml b/helm/Chart.yaml index 27451ad55ce75592db9dc7550b1f81dced3951bc..973c985b5bdaa4d53390954017ed9176bb396f55 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -24,10 +24,11 @@ dependencies: version: 0.6.0 repository: https://soerenhenning.github.io/cp-helm-charts condition: cp-helm-charts.enabled - - name: kafka-lag-exporter - version: 0.6.7 - repository: https://lightbend.github.io/kafka-lag-exporter/repo/ - condition: kafka-lag-exporter.enabled + - name: strimzi-kafka-operator + version: 0.28.0 + repository: https://strimzi.io/charts/ + condition: strimzi.enabled + version: 0.7.0-SNAPSHOT diff --git a/helm/templates/grafana/dashboard-config-map.yaml b/helm/templates/grafana/dashboard-config-map.yaml index 0df01b20efa0fb1100fe4b7289b00b3058eb032f..cc9ec623f3ef71459ccad65128bf8cd65f0d6eb6 100644 --- a/helm/templates/grafana/dashboard-config-map.yaml +++ b/helm/templates/grafana/dashboard-config-map.yaml @@ -71,10 +71,10 @@ data: "steppedLine": false, "targets": [ { - "expr": "sum(cp_kafka_server_brokertopicmetrics_messagesinpersec_topic_input)", + "expr": "sum by (topic) (rate(kafka_server_brokertopicmetrics_messagesin_total{topic='input'}[30s]))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{Messages In Per Second}}", + "legendFormat": "{{topic}}", "refId": "D" } ], @@ -162,10 +162,10 @@ data: "steppedLine": false, "targets": [ { - "expr": "sum(cp_kafka_server_brokertopicmetrics_messagesinpersec_topic_output)", + "expr": "sum by (topic) (rate(kafka_server_brokertopicmetrics_messagesin_total{topic='output'}[30s]))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{Messages Out Per Second}}", + "legendFormat": "{{topic}}", "refId": "D" } ], @@ -253,7 +253,7 @@ data: "steppedLine": false, "targets": [ { - "expr": "sum by(group, topic) (kafka_consumergroup_group_lag >= 0)", + "expr": "sum by(consumergroup, topic) (kafka_consumergroup_lag >= 0)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{topic}}", @@ -344,10 +344,10 @@ data: "steppedLine": false, "targets": [ { - "expr": "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", + "expr": "sum by(consumergroup) (kafka_consumergroup_members >= 0)", "format": "time_series", "intervalFactor": 1, - "legendFormat": "instances", + "legendFormat": "{{consumergroup}}", "refId": "D" } ], @@ -436,7 +436,7 @@ data: "steppedLine": false, "targets": [ { - "expr": "sum by(group,topic) (kafka_consumergroup_group_offset >= 0)", + "expr": "sum by(consumergroup,topic) (kafka_consumergroup_current_offset{topic='input'} >= 0)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{topic}}", @@ -527,7 +527,7 @@ data: "steppedLine": false, "targets": [ { - "expr": "count by(group,topic) (kafka_consumergroup_group_offset >= 0)", + "expr": "sum by(topic) (kafka_topic_partitions >= 0)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{topic}}", @@ -618,7 +618,7 @@ data: "steppedLine": false, "targets": [ { - "expr": "sum by(group,topic) (kafka_partition_latest_offset)", + "expr": "sum by(topic) (kafka_topic_partition_current_offset)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{topic}}", diff --git a/helm/templates/kafka/kafka-client.yaml b/helm/templates/kafka/kafka-client.yaml index 02e16d33dfc9595dd16c41fa6bfe1404fd7889ab..1edf1b96c7326d6a3cfdb1a45640ec9d8d720fba 100644 --- a/helm/templates/kafka/kafka-client.yaml +++ b/helm/templates/kafka/kafka-client.yaml @@ -2,7 +2,6 @@ apiVersion: v1 kind: Pod metadata: - # name: {{ template "theodolite.fullname" . }}-kafka-client name: {{ template "theodolite.fullname" . }}-kafka-client spec: containers: @@ -12,6 +11,9 @@ spec: - sh - -c - "exec tail -f /dev/null" + env: + - name: BOOTSTRAP_SERVER + value: {{ template "theodolite.fullname" . }}-kafka-kafka-bootstrap:9092 {{- with .Values.kafkaClient.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/helm/templates/prometheus/prometheus.yaml b/helm/templates/prometheus/prometheus.yaml index 23a015250e19cc14550ce73e8162ba27f65be774..196d68487824d7d8e130c56d11cec2687304d7e6 100644 --- a/helm/templates/prometheus/prometheus.yaml +++ b/helm/templates/prometheus/prometheus.yaml @@ -5,11 +5,12 @@ metadata: name: {{ template "theodolite.fullname" . }}-prometheus spec: serviceAccountName: {{ template "theodolite.fullname" . }}-prometheus + podMonitorSelector: {} serviceMonitorSelector: {} resources: requests: memory: 400Mi - #scrapeInterval: 1s + scrapeInterval: 15s enableAdminAPI: true {{- with .Values.prometheus.nodeSelector }} nodeSelector: diff --git a/helm/templates/strimzi/kafka-cluster.yaml b/helm/templates/strimzi/kafka-cluster.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0d7eccfd279c62f7d996a8e3e41a55a5ebdd4e96 --- /dev/null +++ b/helm/templates/strimzi/kafka-cluster.yaml @@ -0,0 +1,43 @@ +{{- if .Values.strimzi.enabled -}} +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: {{ template "theodolite.fullname" . }}-kafka +spec: + kafka: + jmxOptions: {} + {{- with .Values.strimzi.kafka.listeners }} + listeners: + {{- toYaml . | nindent 6 }} + {{- end }} + {{- with .Values.strimzi.kafka.replicas }} + replicas: + {{- toYaml . | nindent 6 }} + {{- end }} + {{- with .Values.strimzi.kafka.config }} + config: + {{- toYaml . | nindent 6 }} + {{- end }} + {{- with .Values.strimzi.kafka.jvmOptions }} + jvmOptions: + {{- toYaml . | nindent 6 }} + {{- end }} + storage: + type: ephemeral + metricsConfig: + type: jmxPrometheusExporter + valueFrom: + configMapKeyRef: + name: {{ template "theodolite.fullname" . }}-kafka-metrics + key: kafka-metrics-config.yml + + kafkaExporter: {} + + zookeeper: + {{- with .Values.strimzi.zookeeper.replicas }} + replicas: + {{- toYaml . | nindent 6 }} + {{- end }} + storage: + type: ephemeral +{{- end }} \ No newline at end of file diff --git a/helm/templates/strimzi/kafka-exporter-podmonitor.yaml b/helm/templates/strimzi/kafka-exporter-podmonitor.yaml new file mode 100644 index 0000000000000000000000000000000000000000..41395dfd6a9ea9f74598ff89c4d9e2e7f8757ff0 --- /dev/null +++ b/helm/templates/strimzi/kafka-exporter-podmonitor.yaml @@ -0,0 +1,16 @@ +{{- if .Values.strimzi.enabled -}} +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: {{ template "theodolite.fullname" . }}-kafka-exporter-podmonitor + labels: + app: theodolite +spec: + selector: + selector: + matchLabels: + strimzi.io/name: {{ template "theodolite.fullname" . }}-kafka-kafka-exporter + podMetricsEndpoints: + - path: /metrics + port: tcp-prometheus +{{- end }} \ No newline at end of file diff --git a/helm/templates/strimzi/kafka-metrics-configmap.yaml b/helm/templates/strimzi/kafka-metrics-configmap.yaml new file mode 100644 index 0000000000000000000000000000000000000000..ad75bfa6ab9ec89a229125d167b897cb58744c42 --- /dev/null +++ b/helm/templates/strimzi/kafka-metrics-configmap.yaml @@ -0,0 +1,133 @@ +{{- if .Values.strimzi.enabled -}} +kind: ConfigMap +apiVersion: v1 +metadata: + name: {{ template "theodolite.fullname" . }}-kafka-metrics + labels: + app: strimzi +data: + kafka-metrics-config.yml: | + # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics + lowercaseOutputName: true + rules: + # Special cases and very specific rules + - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + topic: "$4" + partition: "$5" + - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + broker: "$4:$5" + - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections + name: kafka_server_$1_connections_tls_info + type: GAUGE + labels: + cipher: "$2" + protocol: "$3" + listener: "$4" + networkProcessor: "$5" + - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections + name: kafka_server_$1_connections_software + type: GAUGE + labels: + clientSoftwareName: "$2" + clientSoftwareVersion: "$3" + listener: "$4" + networkProcessor: "$5" + - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):" + name: kafka_server_$1_$4 + type: GAUGE + labels: + listener: "$2" + networkProcessor: "$3" + - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+) + name: kafka_server_$1_$4 + type: GAUGE + labels: + listener: "$2" + networkProcessor: "$3" + # Some percent metrics use MeanRate attribute + # Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate + - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate + name: kafka_$1_$2_$3_percent + type: GAUGE + # Generic gauges for percents + - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value + name: kafka_$1_$2_$3_percent + type: GAUGE + - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value + name: kafka_$1_$2_$3_percent + type: GAUGE + labels: + "$4": "$5" + # Generic per-second counters with 0-2 key/value pairs + - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count + name: kafka_$1_$2_$3_total + type: COUNTER + labels: + "$4": "$5" + "$6": "$7" + - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count + name: kafka_$1_$2_$3_total + type: COUNTER + labels: + "$4": "$5" + - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count + name: kafka_$1_$2_$3_total + type: COUNTER + # Generic gauges with 0-2 key/value pairs + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + "$6": "$7" + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value + name: kafka_$1_$2_$3 + type: GAUGE + # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's. + # Note that these are missing the '_sum' metric! + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count + name: kafka_$1_$2_$3_count + type: COUNTER + labels: + "$4": "$5" + "$6": "$7" + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + "$6": "$7" + quantile: "0.$8" + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count + name: kafka_$1_$2_$3_count + type: COUNTER + labels: + "$4": "$5" + - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + quantile: "0.$6" + - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count + name: kafka_$1_$2_$3_count + type: COUNTER + - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + quantile: "0.$4" +{{- end }} \ No newline at end of file diff --git a/helm/templates/strimzi/kafka-podmonitor.yaml b/helm/templates/strimzi/kafka-podmonitor.yaml new file mode 100644 index 0000000000000000000000000000000000000000..522f23f305b53615251d80e8cb13eac50e5530ad --- /dev/null +++ b/helm/templates/strimzi/kafka-podmonitor.yaml @@ -0,0 +1,16 @@ +{{- if .Values.strimzi.enabled -}} +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: {{ template "theodolite.fullname" . }}-kafka-resources-metrics + labels: + app: theodolite +spec: + selector: + selector: + matchLabels: + strimzi.io/name: {{ template "theodolite.fullname" . }}-kafka-kafka + podMetricsEndpoints: + - path: /metrics + port: tcp-prometheus +{{- end }} \ No newline at end of file diff --git a/helm/templates/theodolite/role.yaml b/helm/templates/theodolite/role.yaml index 43ee0e43d6974cd95548df32d6c4b1df8f3e497e..ba5a223b6527df94b64fac3574ee5f90fdb3903b 100644 --- a/helm/templates/theodolite/role.yaml +++ b/helm/templates/theodolite/role.yaml @@ -38,6 +38,7 @@ rules: - monitoring.coreos.com resources: - servicemonitors + - podmonitors verbs: - update - delete diff --git a/helm/values.yaml b/helm/values.yaml index ba58b040974886518ab111d668cb0db1140b2eb8..797098b1a2316389134827cfd6be37c0aaf3c4e6 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -64,7 +64,7 @@ cp-helm-charts: ## Zookeeper ## ------------------------------------------------------ cp-zookeeper: - enabled: true + enabled: false nodeSelector: {} servers: 3 image: confluentinc/cp-zookeeper @@ -81,7 +81,7 @@ cp-helm-charts: ## Kafka ## ------------------------------------------------------ cp-kafka: - enabled: true + enabled: false nodeSelector: {} brokers: 3 image: confluentinc/cp-enterprise-kafka @@ -137,6 +137,9 @@ cp-helm-charts: nodePort: 30099 annotations: {} + kafka: + bootstrapServers: theodolite-kafka-kafka-bootstrap:9092 + cp-kafka-rest: enabled: false @@ -149,29 +152,30 @@ cp-helm-charts: cp-control-center: enabled: false - -### -# Kafka Lag Exporter -### -kafka-lag-exporter: - enabled: true - image: - pullPolicy: IfNotPresent - nodeSelector: {} - - clusters: - - name: "theodolite-cp-kafka" - bootstrapBrokers: "theodolite-cp-kafka:9092" - ## The interval between refreshing metrics pollIntervalSeconds: 15 - prometheus: - serviceMonitor: - enabled: true - interval: "5s" - additionalLabels: - appScope: titan-ccp +strimzi: + enabled: true + kafka: + listeners: + - name: plain + port: 9092 + type: internal + tls: false + replicas: 3 + config: + "message.max.bytes": "134217728" # 128 MB + "replica.fetch.max.bytes": "134217728" #128 MB + "auto.create.topics.enable": false + "log.retention.ms": "7200000" # 2h + "metrics.sample.window.ms": "5000" #5s + jvmOptions: + "-Xmx": "512M" + "-Xms": "512M" + + zookeeper: + replicas: 3 ### diff --git a/slo-checker/record-lag/app/main.py b/slo-checker/record-lag/app/main.py index 621fa0cfc9c27e809fd92752de93f2795fa32c05..2e38354d45df57087a94e57d5c9ca412ed5534d3 100644 --- a/slo-checker/record-lag/app/main.py +++ b/slo-checker/record-lag/app/main.py @@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG': def calculate_slope_trend(results, warmup): d = [] for result in results: - group = result['metric']['group'] + group = result['metric']['consumergroup'] for value in result['values']: d.append({'group': group, 'timestamp': int( value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) diff --git a/slo-checker/record-lag/resources/test-1-rep-success.json b/slo-checker/record-lag/resources/test-1-rep-success.json index dfe11282720ebfcdd60582b7717da892bc85a923..09320c5f1eb0f96d847ddaa17cbd48c47a989155 100644 --- a/slo-checker/record-lag/resources/test-1-rep-success.json +++ b/slo-checker/record-lag/resources/test-1-rep-success.json @@ -3,7 +3,7 @@ [ { "metric": { - "group": "theodolite-uc1-application-0.0.1" + "consumergroup": "theodolite-uc1-application-0.0.1" }, "values": [ [ diff --git a/slo-checker/record-lag/resources/test-3-rep-success.json b/slo-checker/record-lag/resources/test-3-rep-success.json index cf483f42f3783aecd1f428ac7bbbe2090c4cade0..e752430e09e00e6ea12128df44f2d687ba037a7a 100644 --- a/slo-checker/record-lag/resources/test-3-rep-success.json +++ b/slo-checker/record-lag/resources/test-3-rep-success.json @@ -3,7 +3,7 @@ [ { "metric": { - "group": "theodolite-uc1-application-0.0.1" + "consumergroup": "theodolite-uc1-application-0.0.1" }, "values": [ [ @@ -100,7 +100,7 @@ [ { "metric": { - "group": "theodolite-uc1-application-0.0.1" + "consumergroup": "theodolite-uc1-application-0.0.1" }, "values": [ [ @@ -193,7 +193,7 @@ [ { "metric": { - "group": "theodolite-uc1-application-0.0.1" + "consumergroup": "theodolite-uc1-application-0.0.1" }, "values": [ [ diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index 03c5ca1daa7ffab71a4d08c04f677d7412e3a2be..3e94fb4c878401183f45ff384e39dd6bc0291a27 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** - * Abstraction of a Beam microservice. - * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. + * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and + * the beam Runner. */ public class AbstractBeamService { @@ -20,26 +20,24 @@ public class AbstractBeamService { // Application Configurations private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = - config.getString(ConfigurationKeys.APPLICATION_NAME); - + private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); /** * Creates AbstractBeamService with options. */ - public AbstractBeamService(final String[] args) { //NOPMD + public AbstractBeamService(final String[] args) { // NOPMD super(); LOGGER.info("Pipeline options:"); for (final String s : args) { LOGGER.info("{}", s); } - options = PipelineOptionsFactory.fromArgs(args).create(); - options.setJobName(applicationName); - LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); + this.options = PipelineOptionsFactory.fromArgs(args).create(); + this.options.setJobName(this.applicationName); + LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); } public Configuration getConfig() { - return config; + return this.config; } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java index 83336b5a4c2451ef4bffefbd60ad9d52fccd9c17..e513c3a0e3dffcb9881f389af5ee9f05c52a2b63 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.Deserializer; /** * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. @@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; * @param <K> Type of the Key. * @param <V> Type of the Value. */ -public class KafkaGenericReader<K, V> extends - PTransform<PBegin, PCollection<KV<K, V>>> { +public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<K, V>>> reader; @@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - public KafkaGenericReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<K>> - keyDeserializer, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<V>> - valueDeserializer) { + public KafkaGenericReader( + final String bootstrapServer, + final String inputTopic, + final Map<String, Object> consumerConfig, + final Class<? extends Deserializer<K>> keyDeserializer, + final Class<? extends Deserializer<V>> valueDeserializer) { super(); // Check if boostrap server and inputTopic are defined @@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - reader = + this.reader = KafkaIO.<K, V>read() .withBootstrapServers(bootstrapServer) .withTopic(inputTopic) diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle index 4611062f1b09ff2dbad02f93b9cc7f9920c32f5e..5849bd93221794d135f1c6cb3bcb62d2174724b5 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.gradle @@ -24,7 +24,6 @@ dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } - implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-simple:1.7.25' implementation project(':beam-commons') diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle index 258d1a82d002184fe96a9df19b7d99806da50d28..7671e602211b6d9e923a3b2a4c87f40fff84c6ec 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle @@ -41,7 +41,6 @@ dependencies { implementation 'org.apache.kafka:kafka-clients:2.2.0' implementation 'com.google.guava:guava:30.1-jre' - implementation 'com.google.code.gson:gson:2.8.2' implementation 'org.slf4j:slf4j-simple:1.6.1' implementation project(':flink-commons') diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle index 112ac662798d5a1e41f146014dd95bdaaba3a264..bf533915a8fdf4a712754857702373264a30f80a 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle @@ -23,7 +23,6 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:3.1.0' - implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.slf4j:slf4j-simple:1.7.25' implementation project(':kstreams-commons') diff --git a/theodolite-benchmarks/definitions/uc1-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc1-flink/resources/jobmanager-deployment.yaml index 1f328b1cd553c8036e570d28b97795fb2b00ec81..f81b851aa935408112e2a2fadbb72046720068c6 100644 --- a/theodolite-benchmarks/definitions/uc1-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc1-flink/resources/jobmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc1-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc1-flink/resources/taskmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc1-flink/resources/taskmanager-deployment.yaml index c2266a4aeb21302262279f147e6512d5264e1dc1..9710d44cbb1ffb75e4d1ebafe1ffe60042587adc 100644 --- a/theodolite-benchmarks/definitions/uc1-flink/resources/taskmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc1-flink/resources/taskmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc1-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc1-flink/uc1-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc1-flink/uc1-flink-benchmark-operator.yaml index 89bac41ee5c8dcefa628b3cb01052df5a1df9292..20953c2d1e64895417f4f5339a0f3820d78735ac 100644 --- a/theodolite-benchmarks/definitions/uc1-flink/uc1-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc1-flink/uc1-flink-benchmark-operator.yaml @@ -50,7 +50,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc1-kstreams/resources/uc1-kstreams-deployment.yaml b/theodolite-benchmarks/definitions/uc1-kstreams/resources/uc1-kstreams-deployment.yaml index 171c3446db2719ee91bd8954233015316851fcf9..b4d2bfe738fd9c96d0219a825d5c4ef2cf5385f3 100644 --- a/theodolite-benchmarks/definitions/uc1-kstreams/resources/uc1-kstreams-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc1-kstreams/resources/uc1-kstreams-deployment.yaml @@ -21,7 +21,7 @@ spec: name: jmx env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS diff --git a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-operator.yaml index fb5557c2df8b483164d3c1000717db4c7cface81..c340547c703c03a2e91738d4f53537938da97e0e 100644 --- a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-operator.yaml @@ -37,7 +37,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-standalone.yaml b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-standalone.yaml index 5aaf87e724a4e8c728c3c15b998cb927ff57f3d5..545474b1c556a8185d3725c51908f996b89e91e2 100644 --- a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-standalone.yaml +++ b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-kstreams-benchmark-standalone.yaml @@ -32,7 +32,7 @@ loadTypes: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc1-load-generator/resources/uc1-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc1-load-generator/resources/uc1-load-generator-deployment.yaml index 9f9ccc6ae39407bb1f027e1e23cb152944b869e0..65048a97d5de3d831f782db329e295a5e5ceb727 100644 --- a/theodolite-benchmarks/definitions/uc1-load-generator/resources/uc1-load-generator-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc1-load-generator/resources/uc1-load-generator-deployment.yaml @@ -27,6 +27,6 @@ spec: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" diff --git a/theodolite-benchmarks/definitions/uc2-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc2-flink/resources/jobmanager-deployment.yaml index 87ea174f71c592bbffab4e5fc9ce6e3963596b9c..ab8816fbf82dde2d71705f5d6977d045fe20f4f5 100644 --- a/theodolite-benchmarks/definitions/uc2-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc2-flink/resources/jobmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc2-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc2-flink/resources/taskmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc2-flink/resources/taskmanager-deployment.yaml index c37df972a334a4a0e27f0420030f99f1dff15b53..b9a917ee825637988cf6902186d99421aac3d671 100644 --- a/theodolite-benchmarks/definitions/uc2-flink/resources/taskmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc2-flink/resources/taskmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc2-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc2-flink/uc2-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc2-flink/uc2-flink-benchmark-operator.yaml index 206fbf9683659fcc074341d7077da04c36909b75..3020bb317c8b500562f1edcf2dc770f1288a8788 100644 --- a/theodolite-benchmarks/definitions/uc2-flink/uc2-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc2-flink/uc2-flink-benchmark-operator.yaml @@ -50,7 +50,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc2-kstreams/resources/uc2-kstreams-deployment.yaml b/theodolite-benchmarks/definitions/uc2-kstreams/resources/uc2-kstreams-deployment.yaml index e07bb3f9e536655712c06a004c5d1fb60ffa67e0..86932cafb26248736fbe060ba7f23ee5dded412d 100644 --- a/theodolite-benchmarks/definitions/uc2-kstreams/resources/uc2-kstreams-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc2-kstreams/resources/uc2-kstreams-deployment.yaml @@ -21,7 +21,7 @@ spec: name: jmx env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS diff --git a/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-operator.yaml index 0db22fa95f46d1cb484fa1a7730b8b6801dac67c..b9f2b14e369b3c8e241be62c04bd480f38d847dc 100644 --- a/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-operator.yaml @@ -37,7 +37,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-standalone.yaml b/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-standalone.yaml index 67376d76bf0a7cc4cd47563a1d8da8dc0aa3b944..572d2b3ceac5fa43a324d4c687f72ddd1e1cdb78 100644 --- a/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-standalone.yaml +++ b/theodolite-benchmarks/definitions/uc2-kstreams/uc2-kstreams-benchmark-standalone.yaml @@ -33,7 +33,7 @@ loadTypes: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc2-load-generator/resources/uc2-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc2-load-generator/resources/uc2-load-generator-deployment.yaml index dfc0af71543c15b12b5c850919feb0e0a4f52f28..d758c66f88fa93c98258febf6c5e6a35f7171820 100644 --- a/theodolite-benchmarks/definitions/uc2-load-generator/resources/uc2-load-generator-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc2-load-generator/resources/uc2-load-generator-deployment.yaml @@ -27,6 +27,6 @@ spec: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" diff --git a/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml index d01123b13fe2d63637ee4000051091a99bad0546..2b3f33e3748dab0fd62747ff1f0caeb768dcd4e2 100644 --- a/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc3-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc3-flink/resources/taskmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc3-flink/resources/taskmanager-deployment.yaml index 495f97817e43d692c30fe898c4ef3118cae682d7..cc1efa23c32220c7c664d8aaa4669f3af6492d15 100644 --- a/theodolite-benchmarks/definitions/uc3-flink/resources/taskmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc3-flink/resources/taskmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc3-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml index 47b64d9890fc0f300ee1bd8e67acbdf7c8c4e4f9..0b6e2490f3b58e5c843f2719b24378b46406c6a9 100644 --- a/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml @@ -50,7 +50,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc3-kstreams/resources/uc3-kstreams-deployment.yaml b/theodolite-benchmarks/definitions/uc3-kstreams/resources/uc3-kstreams-deployment.yaml index e3f63fae9e245e6116e0fe451480d9bc74b36433..0b58c15fd123c9d06e441084063eae40d20cc48a 100644 --- a/theodolite-benchmarks/definitions/uc3-kstreams/resources/uc3-kstreams-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc3-kstreams/resources/uc3-kstreams-deployment.yaml @@ -21,7 +21,7 @@ spec: name: jmx env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS diff --git a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml index 25374ad92a32782857cea5924ea6482060832eac..1db619303fe1bb108205654c2245b8032b723c15 100644 --- a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml @@ -37,7 +37,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-standalone.yaml b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-standalone.yaml index aa92913d2c992835078174747ea849ce296c3eb1..f879e0949e325e2e2cd830231170775935bda65d 100644 --- a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-standalone.yaml +++ b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-standalone.yaml @@ -33,7 +33,7 @@ loadTypes: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc3-load-generator/resources/uc3-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc3-load-generator/resources/uc3-load-generator-deployment.yaml index c1cad0b70fd82a5bbb43792ee79f9cf5cc71d95f..928e0a91b210e786b3d6b156e964a7fb9a7cc184 100644 --- a/theodolite-benchmarks/definitions/uc3-load-generator/resources/uc3-load-generator-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc3-load-generator/resources/uc3-load-generator-deployment.yaml @@ -27,6 +27,6 @@ spec: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" diff --git a/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml index 032499ea498f8155fd80e42ec4cbdd850498b217..2f6eaecc4fd1aede1b9a5ff2341149d1aa8c5ccd 100644 --- a/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc4-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc4-flink/resources/taskmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc4-flink/resources/taskmanager-deployment.yaml index 495f97817e43d692c30fe898c4ef3118cae682d7..cc1efa23c32220c7c664d8aaa4669f3af6492d15 100644 --- a/theodolite-benchmarks/definitions/uc4-flink/resources/taskmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc4-flink/resources/taskmanager-deployment.yaml @@ -20,7 +20,7 @@ spec: image: ghcr.io/cau-se/theodolite-uc3-flink:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS diff --git a/theodolite-benchmarks/definitions/uc4-flink/uc4-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc4-flink/uc4-flink-benchmark-operator.yaml index 8a73f5b0f87198def7b152ea52008e3d4a1aa4ee..28ae937e964127ded0e34d637ab307fa08db8ec3 100644 --- a/theodolite-benchmarks/definitions/uc4-flink/uc4-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc4-flink/uc4-flink-benchmark-operator.yaml @@ -51,7 +51,7 @@ spec: loadGenMaxRecords: "150000" numSensors: "4.0" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc4-kstreams/resources/uc4-kstreams-deployment.yaml b/theodolite-benchmarks/definitions/uc4-kstreams/resources/uc4-kstreams-deployment.yaml index 20e0872d262df46b5c213d9d529983f5f4155735..3c9a96e36ebf31397f91930426fc028be9d5a2c6 100644 --- a/theodolite-benchmarks/definitions/uc4-kstreams/resources/uc4-kstreams-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc4-kstreams/resources/uc4-kstreams-deployment.yaml @@ -21,7 +21,7 @@ spec: name: jmx env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS diff --git a/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-operator.yaml index 655db2fd4122c9e0e844eed3bfe7c0a878c6d7ec..9ce6daa2dc14e8beecba1c43381defea6bba0d37 100644 --- a/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-operator.yaml @@ -38,7 +38,7 @@ spec: loadGenMaxRecords: "150000" numSensors: "4.0" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-standalone.yaml b/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-standalone.yaml index 5c50b6f95d796941c0b2830549ef825f4a4ff6fb..afed6115102499f225b11c25633e07168a903ea8 100644 --- a/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-standalone.yaml +++ b/theodolite-benchmarks/definitions/uc4-kstreams/uc4-kstreams-benchmark-standalone.yaml @@ -34,7 +34,7 @@ loadTypes: loadGenMaxRecords: "150000" numSensors: "4.0" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite-benchmarks/definitions/uc4-load-generator/resources/uc4-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc4-load-generator/resources/uc4-load-generator-deployment.yaml index 7a69d13daae57b06c77f316da9aa953b21ac096b..0f64fc881b5beb31a6dad4ff2a755413049b96fa 100644 --- a/theodolite-benchmarks/definitions/uc4-load-generator/resources/uc4-load-generator-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc4-load-generator/resources/uc4-load-generator-deployment.yaml @@ -27,7 +27,7 @@ spec: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: NUM_NESTED_GROUPS diff --git a/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh b/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh index ebbecd1c5336c5dd907db11b8c8c45924e5924a8..7c7f11a94f42d56d91d383f27d58ad9a09a918e5 100755 --- a/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh +++ b/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh @@ -2,7 +2,7 @@ sleep 55s # to let the benchmark and produce some output docker-compose logs --tail 100 benchmark-taskmanager | - sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | + sed -n "s/^.*Record:\s\(\S*\)$/\1/p" | tee /dev/stderr | jq .identifier | sort | diff --git a/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh b/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh index ed17db3a44d5c4a8dacfbc956c2f36dd47503508..62327e860cb658741d0892052f5202df3f5b431e 100755 --- a/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh +++ b/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh @@ -2,7 +2,7 @@ sleep 55s # to let the benchmark and produce some output docker-compose logs --tail 100 benchmark | - sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | + sed -n "s/^.*Record:\s\(\S*\)$/\1/p" | tee /dev/stderr | jq .identifier | sort | diff --git a/theodolite-benchmarks/http-bridge/build.gradle b/theodolite-benchmarks/http-bridge/build.gradle index 0377eefc76b456d8e0f94087b06d0c2689f977cf..6bc7c721894e36799015a0f2fb155c9b838238b8 100644 --- a/theodolite-benchmarks/http-bridge/build.gradle +++ b/theodolite-benchmarks/http-bridge/build.gradle @@ -24,6 +24,8 @@ dependencies { implementation project(':load-generator-commons') implementation 'io.javalin:javalin:4.3.0' + implementation 'io.micrometer:micrometer-core:1.8.3' + implementation 'io.micrometer:micrometer-registry-prometheus:1.8.3' implementation 'com.google.code.gson:gson:2.8.2' runtimeOnly 'org.slf4j:slf4j-simple:1.7.25' diff --git a/theodolite-benchmarks/http-bridge/src/main/java/theodolite/commons/httpbridge/JavalinWebServer.java b/theodolite-benchmarks/http-bridge/src/main/java/theodolite/commons/httpbridge/JavalinWebServer.java index c23a17588d661fc5d1c6e9eb294d2d29fc165675..e7073a421582d74009f4a89acfcfb816a689acf3 100644 --- a/theodolite-benchmarks/http-bridge/src/main/java/theodolite/commons/httpbridge/JavalinWebServer.java +++ b/theodolite-benchmarks/http-bridge/src/main/java/theodolite/commons/httpbridge/JavalinWebServer.java @@ -1,6 +1,10 @@ package theodolite.commons.httpbridge; import io.javalin.Javalin; +import io.javalin.plugin.metrics.MicrometerPlugin; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.prometheus.client.exporter.common.TextFormat; import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +18,9 @@ public class JavalinWebServer { private static final int HTTP_SUCCESS = 200; - private final Javalin app = Javalin.create(); + private final Javalin app; + + private final PrometheusMeterRegistry registry; private final String host; private final int port; @@ -28,6 +34,10 @@ public class JavalinWebServer { final int port) { this.host = host; this.port = port; + this.registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + this.app = Javalin.create(config -> { + config.registerPlugin(new MicrometerPlugin(this.registry)); + }); this.configureRoutes(converters); } @@ -40,6 +50,9 @@ public class JavalinWebServer { ctx.status(HTTP_SUCCESS); }); } + this.app.get("/metrics", ctx -> ctx + .contentType(TextFormat.CONTENT_TYPE_004) + .result(this.registry.scrape())); } public void start() { diff --git a/theodolite-benchmarks/load-generator-commons/build.gradle b/theodolite-benchmarks/load-generator-commons/build.gradle index 2d8f77b5154b5b788e0729da69122b443740ce75..62542eeabc1cccb35fa9f71d1929b72956a56999 100644 --- a/theodolite-benchmarks/load-generator-commons/build.gradle +++ b/theodolite-benchmarks/load-generator-commons/build.gradle @@ -21,6 +21,9 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? + implementation platform('com.google.cloud:libraries-bom:24.2.0') + implementation 'com.google.protobuf:protobuf-java-util' + implementation 'com.google.cloud:google-cloud-pubsub' // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 7a60e271f04e396b2e0c69b1fcfee1d8a1ca8a7d..e94a11425eebc8180504a8a4f4ff582116623574 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -39,6 +39,12 @@ public final class ConfigurationKeys { public static final String HTTP_URL = "HTTP_URL"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; + + public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; + + public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..2901b68d8f3e6fa90cccfe15e7992aca67653f94 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java @@ -0,0 +1,138 @@ +package theodolite.commons.workloadgeneration; + +import java.net.URI; +import java.time.Duration; +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; + +class EnvVarLoadGeneratorFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class); + + public LoadGenerator create(final LoadGenerator loadGeneratorTemplate) { + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.NUM_SENSORS), + Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT))); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PERIOD_MS), + Integer.toString(LoadGenerator.PERIOD_MS_DEFAULT))); + final double value = Double.parseDouble(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.VALUE), + Integer.toString(LoadGenerator.VALUE_DEFAULT))); + final int threads = Integer.parseInt(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.THREADS), + Integer.toString(LoadGenerator.THREADS_DEFAULT))); + + return loadGeneratorTemplate + .setClusterConfig(this.buildClusterConfig()) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), + Duration.ofMillis(periodMs))) + .setGeneratorConfig(new LoadGeneratorConfig( + TitanRecordGenerator.forConstantValue(value), + this.buildRecordSender())) + .withThreads(threads); + } + + private ClusterConfig buildClusterConfig() { + final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); + final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); + + ClusterConfig clusterConfig; + if (bootstrapServer != null) { // NOPMD + clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); + LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); + } else if (kubernetesDnsName != null) { // NOPMD + clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); + LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); + } else { + clusterConfig = ClusterConfig.fromBootstrapServer(LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + LOGGER.info( + "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS + LoadGenerator.BOOTSTRAP_SERVER_DEFAULT); + } + + final String port = System.getenv(ConfigurationKeys.PORT); + if (port != null) { + clusterConfig.setPort(Integer.parseInt(port)); + } + + final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); + if (portAutoIncrement != null) { + clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); + } + + final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); + if (clusterNamePrefix != null) { + clusterConfig.setClusterNamePrefix(portAutoIncrement); + } + + return clusterConfig; + } + + private RecordSender<ActivePowerRecord> buildRecordSender() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + LoadGenerator.TARGET_DEFAULT.getValue())); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + LoadGenerator.KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + LoadGenerator.KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + LoadGenerator.SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + LOGGER.info( + "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS + kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI url = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URL), + LoadGenerator.HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(url); + LOGGER.info("Use HTTP server as target with url '{}'.", url); + } else if (target == LoadGeneratorTarget.PUBSUB) { + final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); + final String inputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC), + LoadGenerator.PUBSUB_TOPIC_DEFAULT); + final String emulatorHost = System.getenv(ConfigurationKeys.PUBSUB_EMULATOR_HOST); + if (emulatorHost != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with emulator host {} and topic '{}'.", + emulatorHost, + inputTopic); + recordSender = TitanPubSubSenderFactory.forEmulatedPubSubConfig(emulatorHost, inputTopic); + } else if (project != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); + recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); + } else { + throw new IllegalStateException("Neither an emulator host nor a project was provided."); + } + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + return recordSender; + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 6453ef0bd3b6d5a3b5f7f2b77fa20da8f79cb35f..1f02a0e0c910f7d1821c92a0fa71f6d08dbbf6ad 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,32 +1,28 @@ package theodolite.commons.workloadgeneration; -import java.net.URI; import java.time.Duration; import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. */ public final class LoadGenerator { - private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - private static final String SENSOR_PREFIX_DEFAULT = "s_"; - private static final int NUMBER_OF_KEYS_DEFAULT = 10; - private static final int PERIOD_MS_DEFAULT = 1000; - private static final int VALUE_DEFAULT = 10; - private static final int THREADS_DEFAULT = 4; - private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_TOPIC_DEFAULT = "input"; - private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD - private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + public static final String SENSOR_PREFIX_DEFAULT = "s_"; + public static final int NUMBER_OF_KEYS_DEFAULT = 10; + public static final int PERIOD_MS_DEFAULT = 1000; + public static final int VALUE_DEFAULT = 10; + public static final int THREADS_DEFAULT = 4; + public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; + // Target: HTTP + public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + // Target: Kafka + public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; + public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS + public static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + // Target: Pub/Sub + public static final String PUBSUB_TOPIC_DEFAULT = "input"; // NOCS private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -106,101 +102,7 @@ public final class LoadGenerator { * Create a basic {@link LoadGenerator} from environment variables. */ public static LoadGenerator fromEnvironment() { - final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); - final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); - - ClusterConfig clusterConfig; - if (bootstrapServer != null) { // NOPMD - clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); - LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); - } else if (kubernetesDnsName != null) { // NOPMD - clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); - LOGGER.info("Use Kubernetes DNS name '{}'.", kubernetesDnsName); - } else { - clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT); - LOGGER.info( - "Neither a bootstrap server nor a Kubernetes DNS name was provided. Use default bootstrap server '{}'.", // NOCS - BOOTSTRAP_SERVER_DEFAULT); - } - - final String port = System.getenv(ConfigurationKeys.PORT); - if (port != null) { - clusterConfig.setPort(Integer.parseInt(port)); - } - - final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); - if (portAutoIncrement != null) { - clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); - } - - final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); - if (clusterNamePrefix != null) { - clusterConfig.setClusterNamePrefix(portAutoIncrement); - } - - final LoadGeneratorTarget target = LoadGeneratorTarget.from( - Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), - TARGET_DEFAULT.getValue())); - - final RecordSender<ActivePowerRecord> recordSender; // NOPMD - if (target == LoadGeneratorTarget.KAFKA) { - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); - recordSender = TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl); - LOGGER.info( - "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS - kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); - } else if (target == LoadGeneratorTarget.HTTP) { - final URI url = URI.create( - Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.HTTP_URL), - HTTP_URI_DEFAULT)); - recordSender = new HttpRecordSender<>(url); - LOGGER.info("Use HTTP server as target with url '{}'.", url); - } else { - // Should never happen - throw new IllegalStateException("Target " + target + " is not handled yet."); - } - - final int numSensors = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.NUM_SENSORS), - Integer.toString(NUMBER_OF_KEYS_DEFAULT))); - final int periodMs = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.PERIOD_MS), - Integer.toString(PERIOD_MS_DEFAULT))); - final double value = Double.parseDouble(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.VALUE), - Integer.toString(VALUE_DEFAULT))); - final int threads = Integer.parseInt(Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.THREADS), - Integer.toString(THREADS_DEFAULT))); - - return new LoadGenerator() - .setClusterConfig(clusterConfig) - .setLoadDefinition(new WorkloadDefinition( - new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), - Duration.ofMillis(periodMs))) - .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGenerator.forConstantValue(value), - recordSender)) - .withThreads(threads); + return new EnvVarLoadGeneratorFactory().create(new LoadGenerator()); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java index 086e4de36301693c6873016122a47709b858a0d4..61ae6e86d1be63ca1f4ae3c362122235bb4662f0 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -4,7 +4,7 @@ import java.util.stream.Stream; enum LoadGeneratorTarget { - KAFKA("kafka"), HTTP("http"); + HTTP("http"), KAFKA("kafka"), PUBSUB("pubsub"); private final String value; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..ccbeb729236307b26538ee12b1a0e2373a7f0378 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -0,0 +1,205 @@ +package theodolite.commons.workloadgeneration; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends monitoring records to Pub/Sub. + * + * @param <T> Record type to send + */ +public class PubSubRecordSender<T> implements RecordSender<T> { + + private static final int SHUTDOWN_TIMEOUT_SEC = 5; + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); + + private final Function<T, ByteBuffer> recordSerializer; + + private final Function<T, Long> timestampAccessor; + + private final Function<T, String> orderingKeyAccessor; + + private final Publisher publisher; + + private PubSubRecordSender(final Builder<T> builder) { + this.orderingKeyAccessor = builder.orderingKeyAccessor; + this.timestampAccessor = builder.timestampAccessor; + this.recordSerializer = builder.recordSerializer; + + try { + this.publisher = builder.buildPublisher(); + } catch (final IOException e) { + throw new IllegalStateException("Can not create Pub/Sub publisher.", e); + } + } + + /** + * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}. + */ + public void terminate() { + this.publisher.shutdown(); + try { + this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void send(final T record) { + final ByteBuffer byteBuffer = this.recordSerializer.apply(record); + final ByteString data = ByteString.copyFrom(byteBuffer); + + final PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(data); + if (this.orderingKeyAccessor != null) { + messageBuilder.setOrderingKey(this.orderingKeyAccessor.apply(record)); + } + if (this.timestampAccessor != null) { + messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))); + } + final PubsubMessage message = messageBuilder.build(); + LOGGER.debug("Send message to PubSub topic {}: {}", this.publisher.getTopicName(), message); + final ApiFuture<String> publishResult = this.publisher.publish(message); + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("Publishing result is {}.", publishResult.get()); + } catch (InterruptedException | ExecutionException e) { + LOGGER.warn("Can not get publishing result.", e); + } + } + } + + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param project The project where to write. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForProject( + final String project, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + return new Builder<>(project, topic, recordSerializer); + } + + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param emulatorHost Host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForEmulator( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + return new WithEmulatorBuilder<>(emulatorHost, topic, recordSerializer); + } + + /** + * Builder class to build a new {@link PubSubRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T> { + + protected final TopicName topicName; + private final Function<T, ByteBuffer> recordSerializer; // NOPMD + private Function<T, Long> timestampAccessor = null; // NOPMD + private Function<T, String> orderingKeyAccessor = null; // NOPMD + + /** + * Creates a Builder object for a {@link PubSubRecordSender}. + * + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + private Builder( + final String project, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + this.topicName = TopicName.of(project, topic); + this.recordSerializer = recordSerializer; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> orderingKeyAccessor(final Function<T, String> keyAccessor) { + this.orderingKeyAccessor = keyAccessor; + return this; + } + + public PubSubRecordSender<T> build() { + return new PubSubRecordSender<>(this); + } + + protected Publisher buildPublisher() throws IOException { + return Publisher.newBuilder(this.topicName).build(); + } + + } + + private static class WithEmulatorBuilder<T> extends Builder<T> { + + private static final String DUMMY_PROJECT = "dummy-project-id"; + + private final String emulatorHost; + + /** + * Creates a Builder object for a {@link PubSubRecordSender}. + * + * @param emulatorHost host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + private WithEmulatorBuilder( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + super(DUMMY_PROJECT, topic, recordSerializer); + this.emulatorHost = emulatorHost; + } + + @Override + protected Publisher buildPublisher() throws IOException { + final ManagedChannel channel = ManagedChannelBuilder + .forTarget(this.emulatorHost) + .usePlaintext() + .build(); + + final TransportChannelProvider channelProvider = FixedTransportChannelProvider + .create(GrpcTransportChannel.create(channel)); + final CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + + return Publisher.newBuilder(super.topicName) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + } + + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..5a18376ab4c2fcbf896f847c0ed34af69c5eb507 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -0,0 +1,50 @@ +package theodolite.commons.workloadgeneration; + +import java.io.IOException; +import java.nio.ByteBuffer; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link PubSubRecordSender}s that sends Titan {@link ActivePowerRecord}s. + */ +public final class TitanPubSubSenderFactory { + + private TitanPubSubSenderFactory() {} + + /** + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given Pub/Sub + * configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig( + final String project, + final String topic) { + return PubSubRecordSender + .builderForProject(project, topic, TitanPubSubSenderFactory::serialize) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } + + /** + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub + * emulator configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forEmulatedPubSubConfig( + final String emulatorHost, + final String topic) { + return PubSubRecordSender + .builderForEmulator(emulatorHost, topic, TitanPubSubSenderFactory::serialize) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } + + private static ByteBuffer serialize(final ActivePowerRecord record) { + try { + return record.toByteBuffer(); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 4ef9d714edc9aa2f46549382d25127d7b40e91fd..0040989a8b3b02487c2d7328726b7caadb90f32f 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -7,6 +7,7 @@ include 'hazelcastjet-commons' include 'beam-commons' include 'uc1-load-generator' +include 'uc1-commons' include 'uc1-kstreams' include 'uc1-flink' include 'uc1-hazelcastjet' diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index 502e94fa737fb2ae1bab861407b27575cd8766ca..659eb09e67487132bca2b3ecb82298690dbf33c6 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -2,4 +2,7 @@ plugins { id 'theodolite.beam' } - +dependencies { + implementation project(':uc1-commons') + implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..e368c3a06cde50ea8d49d84b038fb2ec5aa97d1a --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/ConverterAdapter.java @@ -0,0 +1,40 @@ +package application; + +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.TypeDescriptor; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link SimpleFunction} which wraps a {@link RecordConverter} to be used with Beam. + * + * @param <T> type the {@link RecordConverter} is associated with. + */ +public class ConverterAdapter<T> extends SimpleFunction<ActivePowerRecord, T> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final RecordConverter<T> recordConverter; + private final TypeDescriptor<T> type; + + /** + * Create a new {@link ConverterAdapter} with a given {@link RecordConverter} and the associated + * type. + */ + public ConverterAdapter(final RecordConverter<T> recordConverter, final Class<T> type) { + super(); + this.recordConverter = recordConverter; + this.type = TypeDescriptor.of(type); + } + + @Override + public T apply(final ActivePowerRecord record) { + return this.recordConverter.convert(record); + } + + @Override + public TypeDescriptor<T> getOutputTypeDescriptor() { + return this.type; + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java new file mode 100644 index 0000000000000000000000000000000000000000..04b47cd8c4c6a976fc602fa2fbf93dcaaa36680e --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java @@ -0,0 +1,41 @@ +package application; + +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} for a generic {@link DatabaseAdapter}. + * + * @param <T> Type parameter of {@link DatabaseAdapter}. + */ +public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + private static final long serialVersionUID = 1L; + + private final DatabaseAdapter<T> databaseAdapter; + private final Class<T> type; + + /** + * Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the + * corresponding {@link Class} object for Beam. + */ + public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) { + super(); + this.databaseAdapter = databaseAdapter; + this.type = type; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements + .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type))) + .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); + + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java deleted file mode 100644 index 251523441e339cbaf58c7e3a1b30e97cc354df18..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java +++ /dev/null @@ -1,24 +0,0 @@ -package application; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.KV; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Logs all Key Value pairs. - */ -public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> { - private static final long serialVersionUID = 4328743; - private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class); - - /** - * Logs all key value pairs it processes. - */ - @ProcessElement - public void processElement(@Element final KV<String, String> kv, - final OutputReceiver<KV<String, String>> out) { - LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue()); - out.output(kv); - } -} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java deleted file mode 100644 index 6b0c6bc4ddfe78c22028da5b8cf7dde7ed57fced..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/MapToGson.java +++ /dev/null @@ -1,26 +0,0 @@ -package application; - -import com.google.gson.Gson; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Converts a Map into a json String. - */ -public class MapToGson extends SimpleFunction<KV<String, ActivePowerRecord>, KV<String, String>> { - private static final long serialVersionUID = 7168356203579050214L; - private transient Gson gsonObj = new Gson(); - - @Override - public KV<String, String> apply( - final KV<String, ActivePowerRecord> kv) { - - if (this.gsonObj == null) { - this.gsonObj = new Gson(); - } - - final String gson = this.gsonObj.toJson(kv.getValue()); - return KV.of(kv.getKey(), gson); - } -} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..91052827ff58f0bb52d289073c84e31cfc234c31 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java @@ -0,0 +1,16 @@ +package application; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store + * {@link ActivePowerRecord}s, optionally, using a {@link Configuration}. + */ +public interface SinkFactory { + + PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration); + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java new file mode 100644 index 0000000000000000000000000000000000000000..82ca2573ef5108e2f2a9423400ca63be1760d449 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java @@ -0,0 +1,52 @@ +package application; + +import application.firestore.FirestoreSink; +import java.util.stream.Stream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Supported Sink types, i.e., {@link PTransform} for converting and storing + * {@link ActivePowerRecord}s. + */ +public enum SinkType implements SinkFactory { + + LOGGER("logger") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return new GenericSink<>(LogWriterFactory.forJson(), String.class); + } + }, + FIRESTORE("firestore") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return FirestoreSink.fromConfig(config); + } + }; + + private final String value; + + SinkType(final String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + /** + * Create a new {@link SinkType} from its string representation. + */ + public static SinkType from(final String value) { + return Stream.of(SinkType.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index eaff08ac78cd18ddfd47eb2949ca13340ecc27b8..352b32a29ff6cfd5d01a4e74798f79c8d08c769a 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,52 +1,39 @@ package application; -import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; - /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of benchmark UC1: Database Storage with Apache Beam. */ public final class Uc1BeamPipeline extends AbstractPipeline { + public static final String SINK_TYPE_KEY = "sink.type"; + protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); - // Set Coders for Classes that will be distributed - final CoderRegistry cr = this.getCoderRegistry(); - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - - // build KafkaConsumerConfig - final Map<String, Object> consumerConfig = this.buildConsumerConfig(); + final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); - // Create Pipeline transformations - final KafkaActivePowerTimestampReader kafka = - new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); + // Set Coders for classes that will be distributed + final CoderRegistry cr = super.getCoderRegistry(); + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - final LogKeyValue logKeyValue = new LogKeyValue(); - final MapToGson mapToGson = new MapToGson(); + final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( + super.bootstrapServer, + super.inputTopic, + super.buildConsumerConfig()); - // Apply pipeline transformations - // Read from Kafka - this.apply(kafka) - // Map to Gson - .apply(MapElements - .via(mapToGson)) - // Print to console - .apply(ParDo.of(logKeyValue)); + super.apply(kafka) + .apply(Values.create()) + .apply(sinkType.create(config)); } + } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..eb9a2670cd8e61ed103a277e9d26072dc926dbeb --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/WriterAdapter.java @@ -0,0 +1,28 @@ +package application; + +import org.apache.beam.sdk.transforms.DoFn; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. + * + * @param <T> type the {@link DatabaseWriter} is associated with. + */ +public class WriterAdapter<T> extends DoFn<T, Void> { + + private static final long serialVersionUID = -5263671231838353742L; // NOPMD + + private final DatabaseWriter<T> databaseWriter; + + public WriterAdapter(final DatabaseWriter<T> databaseWriter) { + super(); + this.databaseWriter = databaseWriter; + } + + @ProcessElement + public void processElement(@Element final T record, final OutputReceiver<Void> out) { + this.databaseWriter.write(record); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..ab4617ecd1a46e083c863d26c999a25ee5008836 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java @@ -0,0 +1,52 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Value; +import java.io.IOException; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { + + private static final long serialVersionUID = -5263671231838353749L; // NOPMD + + private transient FirestoreConfig firestoreConfig; + + private final String collection; + + public DocumentMapper(final String collection) { + super(); + this.collection = collection; + } + + @Override + public Document apply(final ActivePowerRecord record) { + return Document + .newBuilder() + .setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp())) + .putFields("identifier", + Value.newBuilder().setStringValue(record.getIdentifier()).build()) + .putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build()) + .putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build()) + .build(); + } + + private String createDocumentName(final String documentId) { + this.initFirestoreConfig(); + return "projects/" + this.firestoreConfig.getProjectId() + + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest() + + "/documents/" + this.collection + + "/" + documentId; + } + + private void initFirestoreConfig() { + if (this.firestoreConfig == null) { + try { + this.firestoreConfig = FirestoreConfig.createFromDefaults(); + } catch (final IOException e) { + throw new IllegalStateException("Cannot create Firestore configuration.", e); + } + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..eb62d69f907cc27f3974f09942e5d75ba701a34f --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java @@ -0,0 +1,29 @@ +package application.firestore; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.firestore.FirestoreOptions; +import java.io.IOException; + +final class FirestoreConfig { + + private final FirestoreOptions firestoreOptions; + + private FirestoreConfig(final FirestoreOptions firestoreOptions) { + this.firestoreOptions = firestoreOptions; + } + + public String getProjectId() { + return this.firestoreOptions.getProjectId(); + } + + public String getDatabaseDdlRequest() { + return this.firestoreOptions.getProjectId(); + } + + public static FirestoreConfig createFromDefaults() throws IOException { + return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder() + .setCredentials(GoogleCredentials.getApplicationDefault()) + .build()); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java new file mode 100644 index 0000000000000000000000000000000000000000..a1db24eeb9e05f3b8e198621f4a3e7107e095b13 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java @@ -0,0 +1,41 @@ +package application.firestore; + +import com.google.cloud.firestore.DocumentSnapshot; +import com.google.firestore.v1.Document; +import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing + * these {@link DocumentSnapshot} to Firestore. + */ +public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection"; + + private static final long serialVersionUID = 1L; + + private final String collectionName; + + public FirestoreSink(final String collectionName) { + super(); + this.collectionName = collectionName; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements.via(new DocumentMapper(this.collectionName))) + .apply(MapElements.via(new UpdateOperationMapper())) + .apply(FirestoreIO.v1().write().batchWrite().build()); + } + + public static FirestoreSink fromConfig(final Configuration config) { + final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY); + return new FirestoreSink(collectionName); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..d67bed2aedbd97bfe4271efa0514c8d4e594683e --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java @@ -0,0 +1,18 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Write; +import org.apache.beam.sdk.transforms.SimpleFunction; + +final class UpdateOperationMapper extends SimpleFunction<Document, Write> { + + private static final long serialVersionUID = -5263671231838353748L; // NOPMD + + @Override + public Write apply(final Document document) { + return Write.newBuilder() + .setUpdate(document) + .build(); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..713419c8d3d74d3bd7fd05c3e839367753fcdee0 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=21 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/theodolite-benchmarks/uc1-commons/build.gradle b/theodolite-benchmarks/uc1-commons/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..0f7d31d1f557ecd214b3a57227851d0f70b61084 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/build.gradle @@ -0,0 +1,24 @@ +plugins { + id 'theodolite.java-commons' +} + +repositories { + mavenCentral() + maven { + url "https://oss.sonatype.org/content/repositories/snapshots/" + } + maven { + url 'https://packages.confluent.io/maven/' + } +} + +dependencies { + // Make this implementation once this is a local subproject. + // Currently, Flink needs its own version of these dependencies. + compileOnly('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + compileOnly('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + + implementation 'com.google.code.gson:gson:2.8.9' + + testImplementation 'junit:junit:4.12' +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..a1cb1ade0dc76b168cf9ee54f64d5ac88d6b3a98 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseAdapter.java @@ -0,0 +1,46 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +import java.util.Objects; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A database adapter consisting of a {@link RecordConverter} and a {@link DatabaseWriter}. + * + * @param <T> intermediate data type written to the database. + */ +public final class DatabaseAdapter<T> { + + private final RecordConverter<T> recordConverter; + + private final DatabaseWriter<T> databaseWriter; + + private DatabaseAdapter(final RecordConverter<T> recordConverter, + final DatabaseWriter<T> databaseWriter) { + this.recordConverter = recordConverter; + this.databaseWriter = databaseWriter; + } + + public RecordConverter<T> getRecordConverter() { + return this.recordConverter; + } + + public DatabaseWriter<T> getDatabaseWriter() { + return this.databaseWriter; + } + + /** + * Create a new {@link DatabaseAdapter}. + * + * @param <T> intermediate data type written to the database. + * @param recordConverter RecordConverter for converting {@link ActivePowerRecord}s to {@code T} + * @param databaseWriter DatabaseWriter for writing converted records to the database. + * @return the {@link DatabaseAdapter}. + */ + public static <T> DatabaseAdapter<T> from(final RecordConverter<T> recordConverter, + final DatabaseWriter<T> databaseWriter) { + Objects.requireNonNull(recordConverter); + Objects.requireNonNull(databaseWriter); + return new DatabaseAdapter<>(recordConverter, databaseWriter); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..1beb269e4b75252ac72f7c30c4a26f7a11de4fb6 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/DatabaseWriter.java @@ -0,0 +1,13 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +/** + * Writes an object to a database. + * + * @param <T> Type expected by the database. + */ +@FunctionalInterface +public interface DatabaseWriter<T> { + + void write(T record); + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java new file mode 100644 index 0000000000000000000000000000000000000000..105f19e0e920e3516f7277cd7804dae210a7d0b1 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/RecordConverter.java @@ -0,0 +1,15 @@ +package rocks.theodolite.benchmarks.uc1.commons; + +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Converts an {@link ActivePowerRecord} to the type required by a database. + * + * @param <T> Type required by the database. + */ +@FunctionalInterface +public interface RecordConverter<T> { + + T convert(ActivePowerRecord record); + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java new file mode 100644 index 0000000000000000000000000000000000000000..f9974affb7bf57fc63e9bfe8ba92fd056da9a97b --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/JsonConverter.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import com.google.gson.Gson; +import java.io.Serializable; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link RecordConverter} that converts {@link ActivePowerRecord}s to JSON strings. + */ +public class JsonConverter implements RecordConverter<String>, Serializable { + + private static final long serialVersionUID = -5263671231838353748L; // NOPMD + + private static final Gson GSON = new Gson(); + + @Override + public String convert(final ActivePowerRecord activePowerRecord) { + return GSON.toJson(activePowerRecord); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..d606a6dffd01257b308bf2afebc3088b52793ccf --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriter.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import java.io.Serializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * Writes string records to a {@link Logger}. + */ +public class LogWriter implements DatabaseWriter<String>, Serializable { + + private static final long serialVersionUID = -5263671231838353749L; // NOPMD + + private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class); + + @Override + public void write(final String string) { + LOGGER.info("Record: {}", string); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..305ed933ba3e0d885de9c65aacc6ace8a0884621 --- /dev/null +++ b/theodolite-benchmarks/uc1-commons/src/main/java/rocks/theodolite/benchmarks/uc1/commons/logger/LogWriterFactory.java @@ -0,0 +1,18 @@ +package rocks.theodolite.benchmarks.uc1.commons.logger; + +import org.slf4j.Logger; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; + +/** + * Provides factory methods for creating a dummy {@link DatabaseAdapter} writing records as logs + * using a SLF4J {@link Logger}. + */ +public final class LogWriterFactory { + + private LogWriterFactory() {} + + public static DatabaseAdapter<String> forJson() { + return DatabaseAdapter.from(new JsonConverter(), new LogWriter()); + } + +} diff --git a/theodolite-benchmarks/uc1-flink/build.gradle b/theodolite-benchmarks/uc1-flink/build.gradle index 8a2a359c4840e67581f7bc24f1544ff519f82525..681effe9a347f0fa9f26d6a2caf0668ade09d6c2 100644 --- a/theodolite-benchmarks/uc1-flink/build.gradle +++ b/theodolite-benchmarks/uc1-flink/build.gradle @@ -2,4 +2,8 @@ plugins { id 'theodolite.flink' } +dependencies { + implementation project(':uc1-commons') +} + mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob" diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..af0a0b1cf5a25d22c1fdc5e7adb7467be03f9b9f --- /dev/null +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConverterAdapter.java @@ -0,0 +1,27 @@ +package theodolite.uc1.application; + +import org.apache.flink.api.common.functions.MapFunction; +import rocks.theodolite.benchmarks.uc1.commons.RecordConverter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link MapFunction} which wraps a {@link RecordConverter} to be used with Flink. + * + * @param <T> type the {@link RecordConverter} is associated with. + */ +public class ConverterAdapter<T> implements MapFunction<ActivePowerRecord, T> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final RecordConverter<T> recordConverter; + + public ConverterAdapter(final RecordConverter<T> recordConverter) { + this.recordConverter = recordConverter; + } + + @Override + public T map(final ActivePowerRecord record) throws Exception { + return this.recordConverter.convert(record); + } + +} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java deleted file mode 100644 index 831db7fe63be6529e6b7ba299dca92b138ff7d13..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/GsonMapper.java +++ /dev/null @@ -1,22 +0,0 @@ -package theodolite.uc1.application; - -import com.google.gson.Gson; -import org.apache.flink.api.common.functions.MapFunction; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON - * strings. - */ -public class GsonMapper implements MapFunction<ActivePowerRecord, String> { - - private static final long serialVersionUID = -5263671231838353747L; // NOPMD - - private static final Gson GSON = new Gson(); - - @Override - public String map(final ActivePowerRecord value) throws Exception { - return GSON.toJson(value); - } - -} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 0cb132e526486e71409736b843dd25bdfa52da4a..41131152734f68dd34489461b1ad31d94a970eac 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -7,6 +7,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import theodolite.commons.flink.KafkaConnectorFactory; import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.model.records.ActivePowerRecord; @@ -22,6 +24,8 @@ public final class HistoryServiceFlinkJob { private final StreamExecutionEnvironment env; private final String applicationId; + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + /** * Create a new instance of the {@link HistoryServiceFlinkJob}. */ @@ -69,9 +73,10 @@ public final class HistoryServiceFlinkJob { stream // .rebalance() - .map(new GsonMapper()) - .flatMap((record, c) -> LOGGER.info("Record: {}", record)) - .returns(Types.GENERIC(Object.class)); // Will never be used + .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter())) + .returns(Types.STRING) + .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())) + .returns(Types.VOID); // Will never be used } /** diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..b2f375ec5f5a66141a2551015fb9fbd013ab9c16 --- /dev/null +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/WriterAdapter.java @@ -0,0 +1,28 @@ +package theodolite.uc1.application; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; + +/** + * {@link FlatMapFunction} which wraps a {@link DatabaseAdapter} to be used with Flink. + * + * @param <T> type the {@link DatabaseWriter} is associated with. + */ +public class WriterAdapter<T> implements FlatMapFunction<T, Void> { + + private static final long serialVersionUID = -5263671231838353747L; // NOPMD + + private final DatabaseWriter<T> databaseWriter; + + public WriterAdapter(final DatabaseWriter<T> databaseWriter) { + this.databaseWriter = databaseWriter; + } + + @Override + public void flatMap(final T value, final Collector<Void> out) throws Exception { + this.databaseWriter.write(value); + } + +} diff --git a/theodolite-benchmarks/uc1-kstreams/build.gradle b/theodolite-benchmarks/uc1-kstreams/build.gradle index 74cfb450ec80759f60582c25ab844e3398d5bf02..1460a99a2aad7767b84259494c4c231344862545 100644 --- a/theodolite-benchmarks/uc1-kstreams/build.gradle +++ b/theodolite-benchmarks/uc1-kstreams/build.gradle @@ -2,4 +2,8 @@ plugins { id 'theodolite.kstreams' } +dependencies { + implementation project(':uc1-commons') +} + mainClassName = "theodolite.uc1.application.HistoryService" diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 427a838f45f6807ede00dcb68ebf8c5580f28ce6..64d6d08c30c1a015c668e744fe164bda3f493aa4 100644 --- a/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -1,13 +1,12 @@ package theodolite.uc1.streamprocessing; -import com.google.gson.Gson; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.model.records.ActivePowerRecord; @@ -16,12 +15,11 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - private static final Gson GSON = new Gson(); - private final String inputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + private final StreamsBuilder builder = new StreamsBuilder(); @@ -42,8 +40,8 @@ public class TopologyBuilder { .stream(this.inputTopic, Consumed.with( Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) - .mapValues(v -> GSON.toJson(v)) - .foreach((k, record) -> LOGGER.info("Record: {}", record)); + .mapValues(this.databaseAdapter.getRecordConverter()::convert) + .foreach((k, record) -> this.databaseAdapter.getDatabaseWriter().write(record)); return this.builder.build(properties); } diff --git a/theodolite/examples/operator/example-benchmark.yaml b/theodolite/examples/operator/example-benchmark.yaml index 5f68af04750bcd779c9682ede69d6c68b9fb3e92..62920091e831ff914fb67e85a67cd3f1d98995ab 100644 --- a/theodolite/examples/operator/example-benchmark.yaml +++ b/theodolite/examples/operator/example-benchmark.yaml @@ -34,7 +34,7 @@ spec: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite/examples/operator/example-configmap.yaml b/theodolite/examples/operator/example-configmap.yaml index 210ce32d3fc0f75b9ffce874d1fa0a1ea9bdc3cd..db511a94cc903869677f2e447d45baf0d983ac6c 100644 --- a/theodolite/examples/operator/example-configmap.yaml +++ b/theodolite/examples/operator/example-configmap.yaml @@ -24,7 +24,7 @@ data: image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS @@ -65,7 +65,7 @@ data: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" uc1-load-generator-service.yaml: | diff --git a/theodolite/examples/resources/uc1-kstreams-deployment.yaml b/theodolite/examples/resources/uc1-kstreams-deployment.yaml index fdd1ff867ac83beb10856baec53569c88169232e..1951b1177572dbd1276afcbc6770f91f9f5ff168 100644 --- a/theodolite/examples/resources/uc1-kstreams-deployment.yaml +++ b/theodolite/examples/resources/uc1-kstreams-deployment.yaml @@ -21,7 +21,7 @@ spec: name: jmx env: - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" - name: JAVA_OPTS diff --git a/theodolite/examples/resources/uc1-load-generator-deployment.yaml b/theodolite/examples/resources/uc1-load-generator-deployment.yaml index 9f9ccc6ae39407bb1f027e1e23cb152944b869e0..65048a97d5de3d831f782db329e295a5e5ceb727 100644 --- a/theodolite/examples/resources/uc1-load-generator-deployment.yaml +++ b/theodolite/examples/resources/uc1-load-generator-deployment.yaml @@ -27,6 +27,6 @@ spec: - name: KUBERNETES_DNS_NAME value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" + value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-cp-schema-registry:8081" diff --git a/theodolite/examples/standalone/example-benchmark.yaml b/theodolite/examples/standalone/example-benchmark.yaml index 4d67399231778c91cebb3ffe088e2d26ef388008..254fb4628a595b627f9f4260e3d5478984cec1c6 100644 --- a/theodolite/examples/standalone/example-benchmark.yaml +++ b/theodolite/examples/standalone/example-benchmark.yaml @@ -31,7 +31,7 @@ loadTypes: properties: loadGenMaxRecords: "150000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40 diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt index 425a4f3b0634d53f8b1d5c4b8abdba9ca81c3f2b..924305660798e6dbed06662ef4e393c63f5f2bfa 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt @@ -4,7 +4,7 @@ import theodolite.benchmark.BenchmarkExecution import theodolite.util.InvalidPatcherConfigurationException import javax.enterprise.context.ApplicationScoped -private const val CONSUMER_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)" +private const val CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" @ApplicationScoped diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt index 871b8cf43907fcb8b0b5ea501c6b47f82e56ff69..36cfef9ce912886a638c200b502923dfe03ef5d0 100644 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt +++ b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/AbstractK8sLoader.kt @@ -13,6 +13,7 @@ abstract class AbstractK8sLoader: K8sResourceLoader { "Deployment" -> loadDeployment(resourceString) "Service" -> loadService(resourceString) "ServiceMonitor" -> loadServiceMonitor(resourceString) + "PodMonitor" -> loadPodMonitor(resourceString) "ConfigMap" -> loadConfigmap(resourceString) "StatefulSet" -> loadStatefulSet(resourceString) "Execution" -> loadExecution(resourceString) @@ -51,6 +52,16 @@ abstract class AbstractK8sLoader: K8sResourceLoader { return loadCustomResourceWrapper(resource, context) } + override fun loadPodMonitor(resource: String): KubernetesResource { + val context = K8sContextFactory().create( + api = "v1", + scope = "Namespaced", + group = "monitoring.coreos.com", + plural = "podmonitors" + ) + return loadCustomResourceWrapper(resource, context) + } + override fun loadExecution(resource: String): KubernetesResource { val context = K8sContextFactory().create( api = "v1", diff --git a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt index c123ab2958132cb43ad188136f738b561e91310b..1487b64bf4f7fbcc735539a429be9237d41205bc 100644 --- a/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt +++ b/theodolite/src/main/kotlin/theodolite/k8s/resourceLoader/K8sResourceLoader.kt @@ -11,5 +11,6 @@ interface K8sResourceLoader { fun loadBenchmark(resource: String): KubernetesResource fun loadConfigmap(resource: String): KubernetesResource fun loadServiceMonitor(resource: String): KubernetesResource + fun loadPodMonitor(resource: String): KubernetesResource fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): KubernetesResource } \ No newline at end of file diff --git a/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml b/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml index e690aa56d74d695b0b81469023ccf82d0046cf45..1ba204bb2821f9b734706d322322b28220ef19d5 100644 --- a/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml +++ b/theodolite/src/test/resources/k8s-resource-files/test-benchmark.yaml @@ -29,7 +29,7 @@ spec: properties: loadGenMaxRecords: "15000" kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" + bootstrapServer: "theodolite-kafka-kafka-bootstrap:9092" topics: - name: "input" numPartitions: 40