From 4715191a3d981e85d9ec2c5883429666e2db1c94 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 9 Feb 2022 14:31:02 +0100
Subject: [PATCH] Add beam-uc2-flink benchmark

---
 .../flink-configuration-configmap.yaml        | 66 ++++++++++++++++++
 .../resources/jobmanager-deployment.yaml      | 69 +++++++++++++++++++
 .../resources/jobmanager-rest-service.yaml    | 14 ++++
 .../resources/jobmanager-service.yaml         | 20 ++++++
 .../resources/service-monitor.yaml            | 14 ++++
 .../resources/taskmanager-deployment.yaml     | 64 +++++++++++++++++
 .../resources/taskmanager-service.yaml        | 14 ++++
 .../uc2-beam-flink-benchmark-operator.yaml    | 62 +++++++++++++++++
 8 files changed, 323 insertions(+)
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/flink-configuration-configmap.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-deployment.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-rest-service.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-service.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/service-monitor.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-deployment.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-service.yaml
 create mode 100644 theodolite-benchmarks/definitions/uc2-beam-flink/uc2-beam-flink-benchmark-operator.yaml

diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/flink-configuration-configmap.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/flink-configuration-configmap.yaml
new file mode 100644
index 000000000..36178e2be
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/flink-configuration-configmap.yaml
@@ -0,0 +1,66 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+    jobmanager.rpc.address: flink-jobmanager
+    taskmanager.numberOfTaskSlots: 1 #TODO
+    #blob.server.port: 6124
+    #jobmanager.rpc.port: 6123
+    #taskmanager.rpc.port: 6122
+    #queryable-state.proxy.ports: 6125
+    #jobmanager.memory.process.size: 4Gb
+    #taskmanager.memory.process.size: 4Gb
+    #parallelism.default: 1 #TODO
+    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
+    metrics.reporter.prom.interval: 10 SECONDS
+    taskmanager.network.detailed-metrics: true
+  # -> gives metrics about inbound/outbound network queue lengths
+  log4j-console.properties: |+
+    # This affects logging for both user code and Flink
+    rootLogger.level = INFO
+    rootLogger.appenderRef.console.ref = ConsoleAppender
+    rootLogger.appenderRef.rolling.ref = RollingFileAppender
+
+    # Uncomment this if you want to _only_ change Flink's logging
+    #logger.flink.name = org.apache.flink
+    #logger.flink.level = INFO
+
+    # The following lines keep the log level of common libraries/connectors on
+    # log level INFO. The root logger does not override this. You have to manually
+    # change the log levels here.
+    logger.akka.name = akka
+    logger.akka.level = INFO
+    logger.kafka.name= org.apache.kafka
+    logger.kafka.level = INFO
+    logger.hadoop.name = org.apache.hadoop
+    logger.hadoop.level = INFO
+    logger.zookeeper.name = org.apache.zookeeper
+    logger.zookeeper.level = INFO
+
+    # Log all infos to the console
+    appender.console.name = ConsoleAppender
+    appender.console.type = CONSOLE
+    appender.console.layout.type = PatternLayout
+    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+    # Log all infos in the given rolling file
+    appender.rolling.name = RollingFileAppender
+    appender.rolling.type = RollingFile
+    appender.rolling.append = false
+    appender.rolling.fileName = ${sys:log.file}
+    appender.rolling.filePattern = ${sys:log.file}.%i
+    appender.rolling.layout.type = PatternLayout
+    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+    appender.rolling.policies.type = Policies
+    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+    appender.rolling.policies.size.size=100MB
+    appender.rolling.strategy.type = DefaultRolloverStrategy
+    appender.rolling.strategy.max = 10
+
+    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
+    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
+    logger.netty.level = OFF
\ No newline at end of file
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-deployment.yaml
new file mode 100644
index 000000000..e4f3d1e67
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-deployment.yaml
@@ -0,0 +1,69 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: flink-jobmanager
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: flink
+      component: jobmanager
+  template:
+    metadata:
+      labels:
+        app: flink
+        component: jobmanager
+    spec:
+      terminationGracePeriodSeconds: 0
+      containers:
+        - name: jobmanager
+          image: ghcr.io/cau-se/theodolite-uc2-beam-flink
+          args: ["standalone-job", "--job-classname", "application.Uc2BeamFlink",
+                  "--parallelism=$(PARALLELISM)",
+                  "--disableMetrics=true",
+                  "--fasterCopy"]
+          # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
+          env:
+            - name: KAFKA_BOOTSTRAP_SERVERS
+              value: "theodolite-cp-kafka:9092"
+            - name: SCHEMA_REGISTRY_URL
+              value: "http://theodolite-cp-schema-registry:8081"
+            - name: COMMIT_INTERVAL_MS
+              value: "100"
+            - name: CHECKPOINTING
+              value: "false"
+            - name: PARALLELISM
+              value: "1"
+            - name: "FLINK_STATE_BACKEND"
+              value: "rocksdb"
+            - name: JOB_MANAGER_RPC_ADDRESS
+              value: "flink-jobmanager"
+            - name: FLINK_PROPERTIES
+              value: |+
+                blob.server.port: 6124
+                jobmanager.rpc.port: 6123
+                taskmanager.rpc.port: 6122
+                queryable-state.proxy.ports: 6125
+                jobmanager.memory.process.size: 4Gb
+                taskmanager.memory.process.size: 4Gb
+                parallelism.default: 1 #TODO
+          resources:
+            limits:
+              memory: 4Gi
+              cpu: 1000m
+          ports:
+            - containerPort: 6123
+              name: rpc
+            - containerPort: 6124
+              name: blob-server
+            - containerPort: 8081
+              name: webui
+            - containerPort: 9249
+              name: metrics
+          livenessProbe:
+            tcpSocket:
+              port: 6123
+            initialDelaySeconds: 30
+            periodSeconds: 60
+          securityContext:
+            runAsUser: 9999
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-rest-service.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-rest-service.yaml
new file mode 100644
index 000000000..3d74aaf7f
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-rest-service.yaml
@@ -0,0 +1,14 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: flink-jobmanager-rest
+spec:
+  type: NodePort
+  ports:
+    - name: rest
+      port: 8081
+      targetPort: 8081
+      nodePort: 30081
+  selector:
+    app: flink
+    component: jobmanager
\ No newline at end of file
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-service.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-service.yaml
new file mode 100644
index 000000000..e2ff5d989
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/jobmanager-service.yaml
@@ -0,0 +1,20 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: flink-jobmanager
+  labels:
+    app: flink
+spec:
+  type: ClusterIP
+  ports:
+    - name: rpc
+      port: 6123
+    - name: blob-server
+      port: 6124
+    - name: webui
+      port: 8081
+    - name: metrics
+      port: 9249
+  selector:
+    app: flink
+    component: jobmanager
\ No newline at end of file
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/service-monitor.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/service-monitor.yaml
new file mode 100644
index 000000000..02f78823c
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/service-monitor.yaml
@@ -0,0 +1,14 @@
+apiVersion: monitoring.coreos.com/v1
+kind: ServiceMonitor
+metadata:
+  labels:
+    app: flink
+    appScope: titan-ccp
+  name: flink
+spec:
+  selector:
+    matchLabels:
+        app: flink
+  endpoints:
+    - port: metrics
+      interval: 10s
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-deployment.yaml
new file mode 100644
index 000000000..b565510fd
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-deployment.yaml
@@ -0,0 +1,64 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: flink-taskmanager
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: flink
+      component: taskmanager
+  template:
+    metadata:
+      labels:
+        app: flink
+        component: taskmanager
+    spec:
+      terminationGracePeriodSeconds: 0
+      containers:
+        - name: taskmanager
+          image: ghcr.io/cau-se/theodolite-uc2-beam-flink
+          args: ["taskmanager"]
+          env:
+            - name: KAFKA_BOOTSTRAP_SERVERS
+              value: "theodolite-cp-kafka:9092"
+            - name: SCHEMA_REGISTRY_URL
+              value: "http://theodolite-cp-schema-registry:8081"
+            - name: COMMIT_INTERVAL_MS
+              value: "100"
+            - name: CHECKPOINTING
+              value: "false"
+            - name: "FLINK_STATE_BACKEND"
+              value: "rocksdb"
+            - name: JOB_MANAGER_RPC_ADDRESS
+              value: "flink-jobmanager"
+            # - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
+            #   value: "1" #TODO
+            # - name: FLINK_PROPERTIES
+            #   value: |+
+            #     blob.server.port: 6124
+            #     jobmanager.rpc.port: 6123
+            #     taskmanager.rpc.port: 6122
+            #     queryable-state.proxy.ports: 6125
+            #     jobmanager.memory.process.size: 4Gb
+            #     taskmanager.memory.process.size: 4Gb
+            #     #parallelism.default: 1 #TODO
+          resources:
+            limits:
+              memory: 4Gi
+              cpu: 1000m
+          ports:
+            - containerPort: 6122
+              name: rpc
+            - containerPort: 6125
+              name: query-state
+            - containerPort: 9249
+              name: metrics
+          livenessProbe:
+            tcpSocket:
+              port: 6122
+            initialDelaySeconds: 30
+            periodSeconds: 60
+          securityContext:
+            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
+
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-service.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-service.yaml
new file mode 100644
index 000000000..a2e27f64a
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/resources/taskmanager-service.yaml
@@ -0,0 +1,14 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: flink-taskmanager
+  labels:
+    app: flink
+spec:
+  type: ClusterIP
+  ports:
+    - name: metrics
+      port: 9249
+  selector:
+    app: flink
+    component: taskmanager
\ No newline at end of file
diff --git a/theodolite-benchmarks/definitions/uc2-beam-flink/uc2-beam-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc2-beam-flink/uc2-beam-flink-benchmark-operator.yaml
new file mode 100644
index 000000000..3c0851983
--- /dev/null
+++ b/theodolite-benchmarks/definitions/uc2-beam-flink/uc2-beam-flink-benchmark-operator.yaml
@@ -0,0 +1,62 @@
+apiVersion: theodolite.com/v1
+kind: benchmark
+metadata:
+  name: uc2-beam-flink
+spec:
+  sut:
+    resources:
+      - configMap:
+          name: "benchmark-resources-uc2-beam-flink"
+          files:
+          - "flink-configuration-configmap.yaml"
+          - "taskmanager-deployment.yaml"
+          - "taskmanager-service.yaml"
+          - "service-monitor.yaml"
+          - "jobmanager-service.yaml"
+          - "jobmanager-deployment.yaml"
+          #- "jobmanager-rest-service.yaml"
+  loadGenerator:
+    resources:
+      - configMap:
+          name: "benchmark-resources-uc2-load-generator"
+          files:
+          - "uc2-load-generator-deployment.yaml"
+          - "uc2-load-generator-service.yaml"
+  resourceTypes:
+    - typeName: "Instances"
+      patchers:
+        - type: "ReplicaPatcher"
+          resource: "taskmanager-deployment.yaml"
+        - type: "EnvVarPatcher"
+          resource: "jobmanager-deployment.yaml"
+          properties:
+            container: "jobmanager"
+            variableName: "PARALLELISM"
+        - type: "EnvVarPatcher" # required?
+          resource: "taskmanager-deployment.yaml"
+          properties:
+            container: "taskmanager"
+            variableName: "PARALLELISM"
+  loadTypes:
+    - typeName: "NumSensors"
+      patchers:
+        - type: "EnvVarPatcher"
+          resource: "uc2-load-generator-deployment.yaml"
+          properties:
+            container: "workload-generator"
+            variableName: "NUM_SENSORS"
+        - type: NumSensorsLoadGeneratorReplicaPatcher
+          resource: "uc2-load-generator-deployment.yaml"
+          properties:
+            loadGenMaxRecords: "150000"
+  kafkaConfig:
+    bootstrapServer: "theodolite-cp-kafka:9092"
+    topics:
+      - name: "input"
+        numPartitions: 40
+        replicationFactor: 1
+      - name: "output"
+        numPartitions: 40
+        replicationFactor: 1
+      - name: "theodolite-.*"
+        removeOnly: True
\ No newline at end of file
-- 
GitLab