Skip to content
Snippets Groups Projects
Commit cb44eb77 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Remove the commit interval option and use instead kv pairs for config

Remove the commit interval option as it is espacially for kafka
streams. Instead use the configurations option, which takes key
value pairs and set these as environment variables for the UC.
parent 0bf928f2
No related branches found
No related tags found
No related merge requests found
......@@ -63,11 +63,6 @@ def default_parser(description):
metavar='<memory limit>',
default=os.environ.get('MEMORY_LIMIT', '4Gi'),
help='Kubernetes memory limit')
parser.add_argument('--commit-ms',
metavar='<commit ms>',
type=int,
default=os.environ.get('COMMIT_MS', 100),
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--duration', '-d',
metavar='<duration>',
type=int,
......@@ -93,11 +88,12 @@ def default_parser(description):
default=os.environ.get('RESULT_PATH', 'results'),
help='A directory path for the results')
parser.add_argument("--configurations",
metavar="KEY=VAL",
dest="configurations",
action=StoreDictKeyPair,
nargs="+",
default=env_dict_default('CONFIGURATIONS'),
metavar="KEY=VAL")
help='Defines the environment variables for the UC')
return parser
def benchmark_parser(description):
......
......@@ -158,7 +158,9 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
instances, uc_id, memory_limit, cpu_limit,
configurations):
"""Applies the service, service monitor, jmx config map and start the
use case application.
......@@ -168,9 +170,9 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc
:param deploy_yaml: The yaml object for the application.
:param int instances: Number of instances for use case application.
:param string uc_id: The id of the use case to execute.
:param int commit_interval_ms: The commit interval in ms.
:param string memory_limit: The memory limit for the application.
:param string cpu_limit: The CPU limit for the application.
:param dict configurations: A dictionary with ENV variables for configurations.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned.
......@@ -217,10 +219,17 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc
lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers']))
app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest'
next(filter(lambda x: x['name'] == 'COMMIT_INTERVAL_MS', app_container['env']))[
'value'] = str(commit_interval_ms)
# Set configurations environment parameters for SPE
for k,v in configurations.items():
conf = {'name': k, 'value': v}
app_container['env'].append(conf)
# Set resources in Kubernetes
app_container['resources']['limits']['memory'] = memory_limit
app_container['resources']['limits']['cpu'] = cpu_limit
# Deploy application
try:
app_deploy = appsApi.create_namespaced_deployment(
namespace=namespace,
......@@ -455,7 +464,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False):
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
......@@ -466,9 +475,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
:param int partitions: Number of partitions the kafka topics should have.
:param string cpu_limit: Max CPU utilazation for application.
:param string memory_limit: Max memory utilazation for application.
:param int commit_interval_ms: Kafka Streams commit interval in milliseconds
:param int execution_minutes: How long to execute the benchmark.
:param boolean reset: Flag for reset of cluster before execution.
:param dict configurations: Key value pairs for setting env variables of UC.
:param boolean reset_only: Flag to only reset the application.
"""
global namespace
......@@ -514,9 +523,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
app_deploy,
instances,
uc_id,
commit_interval_ms,
memory_limit,
cpu_limit)
cpu_limit,
configurations)
print('---------------------')
wait_execution(execution_minutes)
......@@ -535,7 +544,7 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
print('---------------------')
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_ms, args.duration, args.prometheus, args.reset,
args.namespace, args.path, args.reset_only)
main(args.exp_id, args.uc, args.load, args.instances, args.partitions,
args.cpu_limit, args.memory_limit, args.duration, args.prometheus,
args.reset, args.namespace, args.path, args.configurations,
args.reset_only)
......@@ -10,12 +10,12 @@ class ExperimentConfig:
partitions: int
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
domain_restriction_strategy: object
search_strategy: object
subexperiment_executor: object
......
......@@ -11,9 +11,9 @@ class SubexperimentConfig:
partitions: int
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
......@@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # successful, the upper neighbor is assumed to also has been successful
......@@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # minimal instances found
......@@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # success -> search in (lower, mid-1)
......
......@@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
......
......@@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
......
......@@ -12,9 +12,9 @@ def execute(subexperiment_config):
partitions=subexperiment_config.partitions,
cpu_limit=subexperiment_config.cpu_limit,
memory_limit=subexperiment_config.memory_limit,
commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
execution_minutes=int(subexperiment_config.execution_minutes),
prometheus_base_url=subexperiment_config.prometheus_base_url,
reset=subexperiment_config.reset,
ns=subexperiment_config.namespace,
result_path=subexperiment_config.result_path)
result_path=subexperiment_config.result_path,
configurations=subexperiment_config.configurations)
......@@ -30,8 +30,8 @@ def load_variables():
def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
commit_ms, duration, domain_restriction, search_strategy,
prometheus_base_url, reset, namespace, result_path):
duration, domain_restriction, search_strategy, prometheus_base_url,
reset, namespace, result_path, configurations):
print(f"Domain restriction of search space activated: {domain_restriction}")
print(f"Chosen search strategy: {search_strategy}")
......@@ -49,16 +49,16 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
# Store metadata
separator = ","
lines = [
f"UC={uc}\n",
f"DIM_VALUES={separator.join(map(str, loads))}\n",
f"REPLICAS={separator.join(map(str, instances_list))}\n",
f"PARTITIONS={partitions}\n",
f"CPU_LIMIT={cpu_limit}\n",
f"MEMORY_LIMIT={memory_limit}\n",
f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n",
f"EXECUTION_MINUTES={duration}\n",
f"DOMAIN_RESTRICTION={domain_restriction}\n",
f"SEARCH_STRATEGY={search_strategy}"
f'UC={uc}\n',
f'DIM_VALUES={separator.join(map(str, loads))}\n',
f'REPLICAS={separator.join(map(str, instances_list))}\n',
f'PARTITIONS={partitions}\n',
f'CPU_LIMIT={cpu_limit}\n',
f'MEMORY_LIMIT={memory_limit}\n',
f'EXECUTION_MINUTES={duration}\n',
f'DOMAIN_RESTRICTION={domain_restriction}\n',
f'SEARCH_STRATEGY={search_strategy}\n',
f'CONFIGURATIONS={configurations}'
]
with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
stream.writelines(lines)
......@@ -95,11 +95,11 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
prometheus_base_url=prometheus_base_url,
reset=reset,
namespace=namespace,
configurations=configurations,
result_path=result_path,
domain_restriction_strategy=domain_restriction_strategy,
search_strategy=search_strategy,
......@@ -114,6 +114,6 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit,
args.memory_limit, args.commit_ms, args.duration,
args.memory_limit, args.duration,
args.domain_restriction, args.search_strategy, args.prometheus,
args.reset, args.namespace, args.path)
args.reset, args.namespace, args.path, args.configurations)
......@@ -20,8 +20,6 @@ spec:
- containerPort: 5555
name: jmx
env:
- name: COMMIT_INTERVAL_MS
value: "100"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment