From cb44eb770ef00be281de2963fce877ce33a43575 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Tue, 24 Nov 2020 11:45:27 +0100
Subject: [PATCH] Remove the commit interval option and use instead kv pairs
 for config

Remove the commit interval option as it is espacially for kafka
streams. Instead use the configurations option, which takes key
value pairs and set these as environment variables for the UC.
---
 execution/lib/cli_parser.py                   |  8 ++---
 execution/run_uc.py                           | 33 ++++++++++++-------
 execution/strategies/config.py                |  2 +-
 execution/strategies/strategies/config.py     |  2 +-
 .../search/binary_search_strategy.py          |  8 ++---
 .../strategies/search/check_all_strategy.py   |  2 +-
 .../search/linear_search_strategy.py          |  2 +-
 .../subexperiment_executor.py                 |  4 +--
 execution/theodolite.py                       | 30 ++++++++---------
 .../base/aggregation-deployment.yaml          |  2 --
 10 files changed, 48 insertions(+), 45 deletions(-)

diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py
index 218999b17..a9c51139f 100644
--- a/execution/lib/cli_parser.py
+++ b/execution/lib/cli_parser.py
@@ -63,11 +63,6 @@ def default_parser(description):
                         metavar='<memory limit>',
                         default=os.environ.get('MEMORY_LIMIT', '4Gi'),
                         help='Kubernetes memory limit')
-    parser.add_argument('--commit-ms',
-                        metavar='<commit ms>',
-                        type=int,
-                        default=os.environ.get('COMMIT_MS', 100),
-                        help='Kafka Streams commit interval in milliseconds')
     parser.add_argument('--duration', '-d',
                         metavar='<duration>',
                         type=int,
@@ -93,11 +88,12 @@ def default_parser(description):
                         default=os.environ.get('RESULT_PATH', 'results'),
                         help='A directory path for the results')
     parser.add_argument("--configurations",
+                        metavar="KEY=VAL",
                         dest="configurations",
                         action=StoreDictKeyPair,
                         nargs="+",
                         default=env_dict_default('CONFIGURATIONS'),
-                        metavar="KEY=VAL")
+                        help='Defines the environment variables for the UC')
     return parser
 
 def benchmark_parser(description):
diff --git a/execution/run_uc.py b/execution/run_uc.py
index 6ebf79724..05e7fb694 100644
--- a/execution/run_uc.py
+++ b/execution/run_uc.py
@@ -158,7 +158,9 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
         return wg_yaml
 
 
-def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
+def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
+                      instances, uc_id, memory_limit, cpu_limit,
+                      configurations):
     """Applies the service, service monitor, jmx config map and start the
     use case application.
 
@@ -168,9 +170,9 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc
     :param deploy_yaml: The yaml object for the application.
     :param int instances: Number of instances for use case application.
     :param string uc_id: The id of the use case to execute.
-    :param int commit_interval_ms: The commit interval in ms.
     :param string memory_limit: The memory limit for the application.
     :param string cpu_limit: The CPU limit for the application.
+    :param dict configurations: A dictionary with ENV variables for configurations.
     :return:
         The Service, ServiceMonitor, JMX ConfigMap and Deployment.
         In case the resource already exist/error the yaml object is returned.
@@ -217,10 +219,17 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc
         lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers']))
     app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
         + '-kstreams-app:latest'
-    next(filter(lambda x: x['name'] == 'COMMIT_INTERVAL_MS', app_container['env']))[
-        'value'] = str(commit_interval_ms)
+
+    # Set configurations environment parameters for SPE
+    for k,v in configurations.items():
+        conf = {'name': k, 'value': v}
+        app_container['env'].append(conf)
+
+    # Set resources in Kubernetes
     app_container['resources']['limits']['memory'] = memory_limit
     app_container['resources']['limits']['cpu'] = cpu_limit
+
+    # Deploy application
     try:
         app_deploy = appsApi.create_namespaced_deployment(
             namespace=namespace,
@@ -455,7 +464,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
     stop_lag_exporter()
 
 
-def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False):
+def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False):
     """
     Main method to execute one time the benchmark for a given use case.
     Start workload generator/application -> execute -> analyse -> stop all
@@ -466,9 +475,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
     :param int partitions: Number of partitions the kafka topics should have.
     :param string cpu_limit: Max CPU utilazation for application.
     :param string memory_limit: Max memory utilazation for application.
-    :param int commit_interval_ms: Kafka Streams commit interval in milliseconds
     :param int execution_minutes: How long to execute the benchmark.
     :param boolean reset: Flag for reset of cluster before execution.
+    :param dict configurations: Key value pairs for setting env variables of UC.
     :param boolean reset_only: Flag to only reset the application.
     """
     global namespace
@@ -514,9 +523,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
         app_deploy,
         instances,
         uc_id,
-        commit_interval_ms,
         memory_limit,
-        cpu_limit)
+        cpu_limit,
+        configurations)
     print('---------------------')
 
     wait_execution(execution_minutes)
@@ -535,7 +544,7 @@ if __name__ == '__main__':
     logging.basicConfig(level=logging.INFO)
     args = load_variables()
     print('---------------------')
-    main(args.exp_id, args.uc, args.load, args.instances,
-         args.partitions, args.cpu_limit, args.memory_limit,
-         args.commit_ms, args.duration, args.prometheus, args.reset,
-         args.namespace, args.path, args.reset_only)
+    main(args.exp_id, args.uc, args.load, args.instances, args.partitions,
+         args.cpu_limit, args.memory_limit, args.duration, args.prometheus,
+         args.reset, args.namespace, args.path, args.configurations,
+         args.reset_only)
diff --git a/execution/strategies/config.py b/execution/strategies/config.py
index 3741bcd5a..c3cd1ff82 100644
--- a/execution/strategies/config.py
+++ b/execution/strategies/config.py
@@ -10,12 +10,12 @@ class ExperimentConfig:
     partitions: int
     cpu_limit: str
     memory_limit: str
-    kafka_streams_commit_interval_ms: int
     execution_minutes: int
     prometheus_base_url: str
     reset: bool
     namespace: str
     result_path: str
+    configurations: dict
     domain_restriction_strategy: object
     search_strategy: object
     subexperiment_executor: object
diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py
index 3c6a15918..5c31f8c97 100644
--- a/execution/strategies/strategies/config.py
+++ b/execution/strategies/strategies/config.py
@@ -11,9 +11,9 @@ class SubexperimentConfig:
     partitions: int
     cpu_limit: str
     memory_limit: str
-    kafka_streams_commit_interval_ms: int
     execution_minutes: int
     prometheus_base_url: str
     reset: bool
     namespace: str
     result_path: str
+    configurations: dict
diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py
index be7da5402..8856ead05 100644
--- a/execution/strategies/strategies/search/binary_search_strategy.py
+++ b/execution/strategies/strategies/search/binary_search_strategy.py
@@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig
 def binary_search(config, dim_value, lower, upper, subexperiment_counter):
     if lower == upper:
         print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
-        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
         config.subexperiment_executor.execute(subexperiment_config)
         result = config.subexperiment_evaluator.execute(subexperiment_config)
         if result==1: # successful, the upper neighbor is assumed to also has been successful
@@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
             return (lower+1, subexperiment_counter)
     elif lower+1==upper:
         print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
-        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
         config.subexperiment_executor.execute(subexperiment_config)
         result = config.subexperiment_evaluator.execute(subexperiment_config)
         if result==1: # minimal instances found
             return (lower, subexperiment_counter)
         else: # not successful, check if lower+1 instances are sufficient
             print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
-            subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+            subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
             config.subexperiment_executor.execute(subexperiment_config)
             result = config.subexperiment_evaluator.execute(subexperiment_config)
             if result == 1: # minimal instances found
@@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
         # test mid
         mid=(upper+lower)//2
         print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
-        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
         config.subexperiment_executor.execute(subexperiment_config)
         result = config.subexperiment_evaluator.execute(subexperiment_config)
         if result == 1: # success -> search in (lower, mid-1)
diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py
index 7d8ea6057..8e9d6c3ca 100644
--- a/execution/strategies/strategies/search/check_all_strategy.py
+++ b/execution/strategies/strategies/search/check_all_strategy.py
@@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
         replicas=config.replicass[lower_replicas_bound_index]
         print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
 
-        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
 
         config.subexperiment_executor.execute(subexperiment_config)
 
diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py
index c4f57c0d9..f2436658e 100644
--- a/execution/strategies/strategies/search/linear_search_strategy.py
+++ b/execution/strategies/strategies/search/linear_search_strategy.py
@@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
         replicas=config.replicass[lower_replicas_bound_index]
         print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
 
-        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
+        subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
 
         config.subexperiment_executor.execute(subexperiment_config)
         result = config.subexperiment_evaluator.execute(subexperiment_config)
diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py
index 3f7af08b7..6931dacfc 100644
--- a/execution/strategies/subexperiment_execution/subexperiment_executor.py
+++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py
@@ -12,9 +12,9 @@ def execute(subexperiment_config):
         partitions=subexperiment_config.partitions,
         cpu_limit=subexperiment_config.cpu_limit,
         memory_limit=subexperiment_config.memory_limit,
-        commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
         execution_minutes=int(subexperiment_config.execution_minutes),
         prometheus_base_url=subexperiment_config.prometheus_base_url,
         reset=subexperiment_config.reset,
         ns=subexperiment_config.namespace,
-        result_path=subexperiment_config.result_path)
+        result_path=subexperiment_config.result_path,
+        configurations=subexperiment_config.configurations)
diff --git a/execution/theodolite.py b/execution/theodolite.py
index 22be2f69a..ef218d99c 100755
--- a/execution/theodolite.py
+++ b/execution/theodolite.py
@@ -30,8 +30,8 @@ def load_variables():
 
 
 def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
-         commit_ms, duration, domain_restriction, search_strategy,
-         prometheus_base_url, reset, namespace, result_path):
+         duration, domain_restriction, search_strategy, prometheus_base_url,
+         reset, namespace, result_path, configurations):
 
     print(f"Domain restriction of search space activated: {domain_restriction}")
     print(f"Chosen search strategy: {search_strategy}")
@@ -49,16 +49,16 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
     # Store metadata
     separator = ","
     lines = [
-            f"UC={uc}\n",
-            f"DIM_VALUES={separator.join(map(str, loads))}\n",
-            f"REPLICAS={separator.join(map(str, instances_list))}\n",
-            f"PARTITIONS={partitions}\n",
-            f"CPU_LIMIT={cpu_limit}\n",
-            f"MEMORY_LIMIT={memory_limit}\n",
-            f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n",
-            f"EXECUTION_MINUTES={duration}\n",
-            f"DOMAIN_RESTRICTION={domain_restriction}\n",
-            f"SEARCH_STRATEGY={search_strategy}"
+            f'UC={uc}\n',
+            f'DIM_VALUES={separator.join(map(str, loads))}\n',
+            f'REPLICAS={separator.join(map(str, instances_list))}\n',
+            f'PARTITIONS={partitions}\n',
+            f'CPU_LIMIT={cpu_limit}\n',
+            f'MEMORY_LIMIT={memory_limit}\n',
+            f'EXECUTION_MINUTES={duration}\n',
+            f'DOMAIN_RESTRICTION={domain_restriction}\n',
+            f'SEARCH_STRATEGY={search_strategy}\n',
+            f'CONFIGURATIONS={configurations}'
             ]
     with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
         stream.writelines(lines)
@@ -95,11 +95,11 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
         partitions=partitions,
         cpu_limit=cpu_limit,
         memory_limit=memory_limit,
-        kafka_streams_commit_interval_ms=commit_ms,
         execution_minutes=duration,
         prometheus_base_url=prometheus_base_url,
         reset=reset,
         namespace=namespace,
+        configurations=configurations,
         result_path=result_path,
         domain_restriction_strategy=domain_restriction_strategy,
         search_strategy=search_strategy,
@@ -114,6 +114,6 @@ if __name__ == '__main__':
     logging.basicConfig(level=logging.INFO)
     args = load_variables()
     main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit,
-         args.memory_limit, args.commit_ms, args.duration,
+         args.memory_limit, args.duration,
          args.domain_restriction, args.search_strategy, args.prometheus,
-         args.reset, args.namespace, args.path)
+         args.reset, args.namespace, args.path, args.configurations)
diff --git a/execution/uc-application/base/aggregation-deployment.yaml b/execution/uc-application/base/aggregation-deployment.yaml
index 81da3eea7..12bf9e718 100644
--- a/execution/uc-application/base/aggregation-deployment.yaml
+++ b/execution/uc-application/base/aggregation-deployment.yaml
@@ -20,8 +20,6 @@ spec:
         - containerPort: 5555
           name: jmx
         env:
-        - name: COMMIT_INTERVAL_MS
-          value: "100"
         - name: KAFKA_BOOTSTRAP_SERVERS
           value: "my-confluent-cp-kafka:9092"
         - name: SCHEMA_REGISTRY_URL
-- 
GitLab