diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py index 218999b173306a32237b2657b32bdea061d3d4e0..a9c51139fd9d444c6d915c570093475e655c9417 100644 --- a/execution/lib/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -63,11 +63,6 @@ def default_parser(description): metavar='<memory limit>', default=os.environ.get('MEMORY_LIMIT', '4Gi'), help='Kubernetes memory limit') - parser.add_argument('--commit-ms', - metavar='<commit ms>', - type=int, - default=os.environ.get('COMMIT_MS', 100), - help='Kafka Streams commit interval in milliseconds') parser.add_argument('--duration', '-d', metavar='<duration>', type=int, @@ -93,11 +88,12 @@ def default_parser(description): default=os.environ.get('RESULT_PATH', 'results'), help='A directory path for the results') parser.add_argument("--configurations", + metavar="KEY=VAL", dest="configurations", action=StoreDictKeyPair, nargs="+", default=env_dict_default('CONFIGURATIONS'), - metavar="KEY=VAL") + help='Defines the environment variables for the UC') return parser def benchmark_parser(description): diff --git a/execution/run_uc.py b/execution/run_uc.py index 6ebf797241b45a342214ea4dbd003e371f5bd828..05e7fb6944f87d7e8845a69d385c411cf14b77cf 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -158,7 +158,9 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): return wg_yaml -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit): +def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, + instances, uc_id, memory_limit, cpu_limit, + configurations): """Applies the service, service monitor, jmx config map and start the use case application. @@ -168,9 +170,9 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc :param deploy_yaml: The yaml object for the application. :param int instances: Number of instances for use case application. :param string uc_id: The id of the use case to execute. - :param int commit_interval_ms: The commit interval in ms. :param string memory_limit: The memory limit for the application. :param string cpu_limit: The CPU limit for the application. + :param dict configurations: A dictionary with ENV variables for configurations. :return: The Service, ServiceMonitor, JMX ConfigMap and Deployment. In case the resource already exist/error the yaml object is returned. @@ -217,10 +219,17 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers'])) app_container['image'] = 'theodolite/theodolite-uc' + uc_id \ + '-kstreams-app:latest' - next(filter(lambda x: x['name'] == 'COMMIT_INTERVAL_MS', app_container['env']))[ - 'value'] = str(commit_interval_ms) + + # Set configurations environment parameters for SPE + for k,v in configurations.items(): + conf = {'name': k, 'value': v} + app_container['env'].append(conf) + + # Set resources in Kubernetes app_container['resources']['limits']['memory'] = memory_limit app_container['resources']['limits']['cpu'] = cpu_limit + + # Deploy application try: app_deploy = appsApi.create_namespaced_deployment( namespace=namespace, @@ -455,7 +464,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): stop_lag_exporter() -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False): +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False): """ Main method to execute one time the benchmark for a given use case. Start workload generator/application -> execute -> analyse -> stop all @@ -466,9 +475,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi :param int partitions: Number of partitions the kafka topics should have. :param string cpu_limit: Max CPU utilazation for application. :param string memory_limit: Max memory utilazation for application. - :param int commit_interval_ms: Kafka Streams commit interval in milliseconds :param int execution_minutes: How long to execute the benchmark. :param boolean reset: Flag for reset of cluster before execution. + :param dict configurations: Key value pairs for setting env variables of UC. :param boolean reset_only: Flag to only reset the application. """ global namespace @@ -514,9 +523,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi app_deploy, instances, uc_id, - commit_interval_ms, memory_limit, - cpu_limit) + cpu_limit, + configurations) print('---------------------') wait_execution(execution_minutes) @@ -535,7 +544,7 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) args = load_variables() print('---------------------') - main(args.exp_id, args.uc, args.load, args.instances, - args.partitions, args.cpu_limit, args.memory_limit, - args.commit_ms, args.duration, args.prometheus, args.reset, - args.namespace, args.path, args.reset_only) + main(args.exp_id, args.uc, args.load, args.instances, args.partitions, + args.cpu_limit, args.memory_limit, args.duration, args.prometheus, + args.reset, args.namespace, args.path, args.configurations, + args.reset_only) diff --git a/execution/strategies/config.py b/execution/strategies/config.py index 3741bcd5a8f025b0efc8bfb6ab53fdf08381ce9f..c3cd1ff82c4926f5efcc741b027996dbc800916b 100644 --- a/execution/strategies/config.py +++ b/execution/strategies/config.py @@ -10,12 +10,12 @@ class ExperimentConfig: partitions: int cpu_limit: str memory_limit: str - kafka_streams_commit_interval_ms: int execution_minutes: int prometheus_base_url: str reset: bool namespace: str result_path: str + configurations: dict domain_restriction_strategy: object search_strategy: object subexperiment_executor: object diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py index 3c6a15918ec8cf923b79e6f4f98564f983deac63..5c31f8c97a4085931cdfa1fa017d4e5909e21915 100644 --- a/execution/strategies/strategies/config.py +++ b/execution/strategies/strategies/config.py @@ -11,9 +11,9 @@ class SubexperimentConfig: partitions: int cpu_limit: str memory_limit: str - kafka_streams_commit_interval_ms: int execution_minutes: int prometheus_base_url: str reset: bool namespace: str result_path: str + configurations: dict diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py index be7da54025c2f9fda1750d8197d3afd4055da790..8856ead0502279f8f8642da87cf56f794cb1b11c 100644 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig def binary_search(config, dim_value, lower, upper, subexperiment_counter): if lower == upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # successful, the upper neighbor is assumed to also has been successful @@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): return (lower+1, subexperiment_counter) elif lower+1==upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # minimal instances found return (lower, subexperiment_counter) else: # not successful, check if lower+1 instances are sufficient print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # minimal instances found @@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): # test mid mid=(upper+lower)//2 print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # success -> search in (lower, mid-1) diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py index 7d8ea605707131d19a023671a77b8f22647d6f51..8e9d6c3ca0924d724c4f55032ebc24a92bc3ad93 100644 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py index c4f57c0d9bd82467a5917bbf95fe330c7bd81a58..f2436658eec0bd4160259a09c272def40fbc130c 100644 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py index 3f7af08b7a52d70609f000a34a47c088574ddfd6..6931dacfc72081cbe112c4d6d1003703ba42c526 100644 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py @@ -12,9 +12,9 @@ def execute(subexperiment_config): partitions=subexperiment_config.partitions, cpu_limit=subexperiment_config.cpu_limit, memory_limit=subexperiment_config.memory_limit, - commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms, execution_minutes=int(subexperiment_config.execution_minutes), prometheus_base_url=subexperiment_config.prometheus_base_url, reset=subexperiment_config.reset, ns=subexperiment_config.namespace, - result_path=subexperiment_config.result_path) + result_path=subexperiment_config.result_path, + configurations=subexperiment_config.configurations) diff --git a/execution/theodolite.py b/execution/theodolite.py index 22be2f69ab81d81b7aac7717041604cd368e771f..ef218d99ce0c0ac695c5a5fa3df3ebe4182b671f 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -30,8 +30,8 @@ def load_variables(): def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, - commit_ms, duration, domain_restriction, search_strategy, - prometheus_base_url, reset, namespace, result_path): + duration, domain_restriction, search_strategy, prometheus_base_url, + reset, namespace, result_path, configurations): print(f"Domain restriction of search space activated: {domain_restriction}") print(f"Chosen search strategy: {search_strategy}") @@ -49,16 +49,16 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, # Store metadata separator = "," lines = [ - f"UC={uc}\n", - f"DIM_VALUES={separator.join(map(str, loads))}\n", - f"REPLICAS={separator.join(map(str, instances_list))}\n", - f"PARTITIONS={partitions}\n", - f"CPU_LIMIT={cpu_limit}\n", - f"MEMORY_LIMIT={memory_limit}\n", - f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n", - f"EXECUTION_MINUTES={duration}\n", - f"DOMAIN_RESTRICTION={domain_restriction}\n", - f"SEARCH_STRATEGY={search_strategy}" + f'UC={uc}\n', + f'DIM_VALUES={separator.join(map(str, loads))}\n', + f'REPLICAS={separator.join(map(str, instances_list))}\n', + f'PARTITIONS={partitions}\n', + f'CPU_LIMIT={cpu_limit}\n', + f'MEMORY_LIMIT={memory_limit}\n', + f'EXECUTION_MINUTES={duration}\n', + f'DOMAIN_RESTRICTION={domain_restriction}\n', + f'SEARCH_STRATEGY={search_strategy}\n', + f'CONFIGURATIONS={configurations}' ] with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream: stream.writelines(lines) @@ -95,11 +95,11 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, partitions=partitions, cpu_limit=cpu_limit, memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, execution_minutes=duration, prometheus_base_url=prometheus_base_url, reset=reset, namespace=namespace, + configurations=configurations, result_path=result_path, domain_restriction_strategy=domain_restriction_strategy, search_strategy=search_strategy, @@ -114,6 +114,6 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) args = load_variables() main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, - args.memory_limit, args.commit_ms, args.duration, + args.memory_limit, args.duration, args.domain_restriction, args.search_strategy, args.prometheus, - args.reset, args.namespace, args.path) + args.reset, args.namespace, args.path, args.configurations) diff --git a/execution/uc-application/base/aggregation-deployment.yaml b/execution/uc-application/base/aggregation-deployment.yaml index 81da3eea7688f5d3b3145092d91cb8502e6ad87b..12bf9e718e81d90f308c7c57012ae0f558a7a7b6 100644 --- a/execution/uc-application/base/aggregation-deployment.yaml +++ b/execution/uc-application/base/aggregation-deployment.yaml @@ -20,8 +20,6 @@ spec: - containerPort: 5555 name: jmx env: - - name: COMMIT_INTERVAL_MS - value: "100" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: SCHEMA_REGISTRY_URL