diff --git a/execution/requirements.txt b/execution/requirements.txt index 7224efe80aa1686bb3de90b2beac5df47a56ed8f..18a06882007eebf69bf3bf4f84b869454b36a0a6 100644 --- a/execution/requirements.txt +++ b/execution/requirements.txt @@ -1,4 +1,8 @@ matplotlib==3.2.0 pandas==1.0.1 requests==2.23.0 -scikit-learn==0.22.2.post1 \ No newline at end of file +scikit-learn==0.22.2.post1 + +# For run_uc.py +kubernetes==11.0.0 +confuse==1.1.0 diff --git a/execution/run_uc.py b/execution/run_uc.py new file mode 100644 index 0000000000000000000000000000000000000000..6ae19307b7e4aca1331797a5a4974c25c1de5420 --- /dev/null +++ b/execution/run_uc.py @@ -0,0 +1,312 @@ +from kubernetes import client, config # kubernetes api +from kubernetes.stream import stream +# import confuse +import argparse # parse arguments from cli +import subprocess # execute bash commands +import time # process sleep + +v1 = None # acces kubernetes api +args = None # CLI arguments + + +def load_variables(): + global args + print('Load CLI variables') + parser = argparse.ArgumentParser(description='Run use case Programm') + parser.add_argument('--use-case', '-uc', + dest='uc_id', + default='1', + metavar='UC_NUMBER', + help='use case number, one of 1, 2, 3 or 4') + parser.add_argument('--dim-value', '-d', + dest='dim_value', + default=10000, + type=int, + metavar='DIM_VALUE', + help='Value for the workload generator to be tested') + parser.add_argument('--instances', '-i', + dest='instances', + default=1, + type=int, + metavar='INSTANCES', + help='Numbers of instances to be benchmarked') + parser.add_argument('--partitions', '-p', + dest='partitions', + default=40, + type=int, + metavar='PARTITIONS', + help='Number of partitions for Kafka topics') + parser.add_argument('--cpu-limit', '-cpu', + dest='cpu_limit', + default='1000m', + metavar='CPU_LIMIT', + help='Kubernetes CPU limit') + parser.add_argument('--memory-limit', '-mem', + dest='memory_limit', + default='4Gi', + metavar='MEMORY_LIMIT', + help='Kubernetes memory limit') + parser.add_argument('--commit-interval', '-ci', + dest='commit_interval_ms', + default=100, + type=int, + metavar='KAFKA_STREAMS_COMMIT_INTERVAL_MS', + help='Kafka Streams commit interval in milliseconds') + parser.add_argument('--executions-minutes', '-exm', + dest='execution_minutes', + default=5, + type=int, + metavar='EXECUTION_MINUTES', + help='Duration in minutes subexperiments should be \ + executed for') + + args = parser.parse_args() + print(args) + + +def initialize_kubernetes_api(): + global v1 + print('Connect to kubernetes api') + try: + config.load_kube_config() # try using local config + except e: + # load config from pod, if local config is not available + config.load_incluster_config() + + v1 = client.CoreV1Api() + + +def create_topics(topics): + # Calling exec and waiting for response + print('Create topics') + for (topic, partitions) in topics: + print('Create topic ' + topic + ' with #' + str(partitions) + + ' partitions') + exec_command = [ + '/bin/sh', + '-c', + f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\ + --create --topic {topic} --partitions {partitions}\ + --replication-factor 1' + ] + resp = stream(v1.connect_get_namespaced_pod_exec, + "kafka-client", + 'default', + command=exec_command, + stderr=True, stdin=False, + stdout=True, tty=False) + print(resp) + + +def start_workload_generator(): + print('Start workload generator') + num_sensors = args.dim_value + wl_max_records = 150000 + # TODO: How is this calculation done? + wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) + + parameters_path = 'uc-workload-generator/overlay/uc' + args.uc_id\ + + '-workload-generator' + f = open(parameters_path + '/set_paramters.yaml', 'w') + f.write('\ +apiVersion: apps/v1\n\ +kind: StatefulSet\n\ +metadata:\n\ + name: titan-ccp-load-generator\n\ +spec:\n\ + replicas: ' + str(wl_instances) + '\n\ + template:\n\ + spec:\n\ + containers:\n\ + - name: workload-generator\n\ + env:\n\ + - name: NUM_SENSORS\n\ + value: "' + str(num_sensors) + '"\n\ + - name: INSTANCES\n\ + value: "' + str(wl_instances) + '"\n') + f.close() + + exec_command = [ + 'kubectl', + 'apply', + '-k', + parameters_path + ] + output = subprocess.run(exec_command, capture_output=True, text=True) + print(output) + return + + +def start_application(): + print('Start use case applications') + + parameters_path = 'uc-application/overlay/uc' + args.uc_id + '-application' + f = open(parameters_path + '/set_paramters.yaml', 'w') + f.write('\ +apiVersion: apps/v1\n\ +kind: Deployment\n\ +metadata:\n\ + name: titan-ccp-aggregation\n\ +spec:\n\ + template:\n\ + spec:\n\ + containers:\n\ + - name: uc-application\n\ + env:\n\ + - name: COMMIT_INTERVAL_MS\n\ + value: "' + str(args.commit_interval_ms) + '"\n\ + resources:\n\ + limits:\n\ + memory: ' + str(args.memory_limit) + '\n\ + cpu: ' + str(args.cpu_limit) + '\n') # cpu limit is already str + f.close() + + exec_command = [ + 'kubectl', + 'apply', + '-k', + parameters_path + ] + output = subprocess.run(exec_command, capture_output=True, text=True) + print(output) + exec_command = [ + 'kubectl', + 'scale', + 'deployment', + 'uc' + args.uc_id + '-titan-ccp-aggregation', + '--replicas=' + str(args.instances) + ] + output = subprocess.run(exec_command, capture_output=True, text=True) + print(output) + return + + +def wait_execution(): + print('Wait while executing') + # TODO: ask which fits better + # time.sleep(args.execution_minutes * 60) + for i in range(args.execution_minutes): + time.sleep(60) + print(f"Executed: {i+1} minutes") + print('Execution finished') + return + + +def run_evaluation_script(): + # TODO: implement + # # Run eval script + # source ../.venv/bin/activate + # python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES + # $EXECUTION_MINUTES + # deactivate + return + + +def stop_applications(): + print('Stop use case application and workload generator') + exec_command = [ + 'kubectl', + 'delete', + '-k', + 'uc-workload-generator/overlay/uc' + args.uc_id + '-workload-generator' + ] + output = subprocess.run(exec_command, capture_output=True, text=True) + print(output) + + exec_command[3] = 'uc-application/overlay/uc' + args.uc_id + '-application' + output = subprocess.run(exec_command, capture_output=True, text=True) + print(output) + return + + +def delete_topics(topics): + print('Delete topics from Kafka') + + num_topics_command = [ + '/bin/sh', + '-c', + f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list | sed -n -E "/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p" | wc -l' + ] + + topics_deletion_command = [ + '/bin/sh', + '-c', + f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic "input|output|configuration|theodolite-.*"' + ] + + # Wait that topics get deleted + while True: + # topic deletion, sometimes a second deletion seems to be required + resp = stream(v1.connect_get_namespaced_pod_exec, + "kafka-client", + 'default', + command=topics_deletion_command, + stderr=True, stdin=False, + stdout=True, tty=False) + print(resp) + + print('Wait for topic deletion') + time.sleep(5) + resp = stream(v1.connect_get_namespaced_pod_exec, + "kafka-client", + 'default', + command=num_topics_command, + stderr=True, stdin=False, + stdout=True, tty=False) + if resp == '0': + print("Topics deleted") + break + return + + +def stop_lag_exporter(): + print('Stop the lag exporter') + + find_pod_command = [ + 'kubectl', + 'get', + 'pod', + '-l', + 'app.kubernetes.io/name=kafka-lag-exporter', + '-o', + 'jsonpath="{.items[0].metadata.name}"' + ] + output = subprocess.run(find_pod_command, capture_output=True, text=True) + lag_exporter_pod = output.stdout.replace('"', '') + delete_pod_command = [ + 'kubectl', + 'delete', + 'pod', + lag_exporter_pod + ] + output = subprocess.run(delete_pod_command, capture_output=True, text=True) + print(output) + return + + +def main(): + load_variables() + print('---------------------') + + topics = [('input', args.partitions), + ('output', args.partitions), + ('configuration', 1)] + + initialize_kubernetes_api() + print('---------------------') + create_topics(topics) + print('---------------------') + start_workload_generator() + print('---------------------') + start_application() + print('---------------------') + wait_execution() + print('---------------------') + stop_applications() + print('---------------------') + delete_topics(topics) + print('---------------------') + stop_lag_exporter() + +if __name__ == '__main__': + main()