Skip to content
Snippets Groups Projects
Commit bcdd5d63 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/cliArgumentsTheodolite' into 'master'

Enhanced CLI arguments  for theodolite

Closes #83

See merge request !44
parents 929f3430 d94ee0a6
No related branches found
No related tags found
1 merge request!44Enhanced CLI arguments for theodolite
Pipeline #1170 passed
......@@ -5,6 +5,7 @@ from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
from strategies.cli_parser import execution_parser
import subprocess # execute bash commands
import sys # for exit of program
import time # process sleep
......@@ -17,71 +18,14 @@ customApi = None # acces kubernetes custom object api
def load_variables():
"""Load the CLI variables given at the command line"""
global args
print('Load CLI variables')
parser = argparse.ArgumentParser(description='Run use case Programm')
parser.add_argument('--exp-id', '-id',
dest='exp_id',
default='1',
metavar='EXP_ID',
help='ID of the experiment')
parser.add_argument('--use-case', '-uc',
dest='uc_id',
default='1',
metavar='UC_NUMBER',
help='use case number, one of 1, 2, 3 or 4')
parser.add_argument('--dim-value', '-d',
dest='dim_value',
default=10000,
type=int,
metavar='DIM_VALUE',
help='Value for the workload generator to be tested')
parser.add_argument('--instances', '-i',
dest='instances',
default=1,
type=int,
metavar='INSTANCES',
help='Numbers of instances to be benchmarked')
parser.add_argument('--partitions', '-p',
dest='partitions',
default=40,
type=int,
metavar='PARTITIONS',
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
dest='cpu_limit',
default='1000m',
metavar='CPU_LIMIT',
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
dest='memory_limit',
default='4Gi',
metavar='MEMORY_LIMIT',
help='Kubernetes memory limit')
parser.add_argument('--commit-interval', '-ci',
dest='commit_interval_ms',
default=100,
type=int,
metavar='KAFKA_STREAMS_COMMIT_INTERVAL_MS',
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--executions-minutes', '-exm',
dest='execution_minutes',
default=5,
type=int,
metavar='EXECUTION_MINUTES',
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--reset', '-res',
dest='reset',
action="store_true",
help='Resets the environment before execution')
parser.add_argument('--reset-only', '-reso',
dest='reset_only',
action="store_true",
help='Only resets the environment. Ignores all other parameters')
parser = execution_parser(description='Run use case Programm')
args = parser.parse_args()
print(args)
if args.exp_id is None or args.uc is None or args.load is None or args.instances is None :
print('The options --exp-id, --uc, --load and --instances are mandatory.')
print('Some might not be set!')
sys.exit(1)
return args
......@@ -583,7 +527,7 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
print('---------------------')
main(args.exp_id, args.uc_id, args.dim_value, args.instances,
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_interval_ms, args.execution_minutes, args.reset,
args.commit_ms, args.duration, args.reset,
args.reset_only)
import argparse
def default_parser(description):
"""
Returns the default parser that can be used for thodolite and run uc py
:param description: The description the argument parser should show.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--uc',
metavar='<uc>',
help='[mandatory] use case number, one of 1, 2, 3 or 4')
parser.add_argument('--partitions', '-p',
default=40,
type=int,
metavar='<partitions>',
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
default='1000m',
metavar='<CPU limit>',
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
default='4Gi',
metavar='<memory limit>',
help='Kubernetes memory limit')
parser.add_argument('--commit-ms',
default=100,
type=int,
metavar='<commit ms>',
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--duration', '-d',
default=5,
type=int,
metavar='<duration>',
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--reset',
action="store_true",
help='Resets the environment before execution')
parser.add_argument('--reset-only',
action="store_true",
help='Only resets the environment. Ignores all other parameters')
return parser
def benchmark_parser(description):
"""
Parser for the overall benchmark execution
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--loads',
type=int,
metavar='<load>',
nargs='+',
help='[mandatory] Loads that should be executed')
parser.add_argument('--instances', '-i',
dest='instances_list',
type=int,
metavar='<instances>',
nargs='+',
help='[mandatory] List of instances used in benchmarks')
parser.add_argument('--domain-restriction',
action="store_true",
help='To use domain restriction. For details see README')
parser.add_argument('--search-strategy',
default='default',
metavar='<strategy>',
help='The benchmarking search strategy. Can be set to default, linear-search or binary-search')
return parser
def execution_parser(description):
"""
Parser for executing one use case
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--exp-id',
metavar='<exp id>',
help='[mandatory] ID of the experiment')
parser.add_argument('--load',
type=int,
metavar='<load>',
help='[mandatory] Load that should be used for benchmakr')
parser.add_argument('--instances',
type=int,
metavar='<instances>',
help='[mandatory] Numbers of instances to be benchmarked')
return parser
#!/usr/bin/env python
import sys
import argparse
import logging # logging
import os
import sys
from strategies.config import ExperimentConfig
from strategies.cli_parser import benchmark_parser
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy
import strategies.strategies.search.check_all_strategy as check_all_strategy
......@@ -12,150 +15,164 @@ from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator
uc=sys.argv[1]
dim_values=sys.argv[2].split(',')
replicas=sys.argv[3].split(',')
partitions=sys.argv[4] if len(sys.argv) >= 5 and sys.argv[4] else 40
cpu_limit=sys.argv[5] if len(sys.argv) >= 6 and sys.argv[5] else "1000m"
memory_limit=sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else "4Gi"
kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[7] else 100
execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5
domain_restriction=bool(sys.argv[9]) if len(sys.argv) >= 10 and sys.argv[9] == "restrict-domain" else False
search_strategy=sys.argv[10] if len(sys.argv) >= 11 and (sys.argv[10] == "linear-search" or sys.argv[10] == "binary-search") else "check-all"
print(f"Domain restriction of search space activated: {domain_restriction}")
print(f"Chosen search strategy: {search_strategy}")
def load_variables():
"""Load the CLI variables given at the command line"""
print('Load CLI variables')
parser = benchmark_parser("Run theodolite benchmarking")
args = parser.parse_args()
print(args)
if args.uc is None or args.loads is None or args.instances_list is None:
print('The options --uc, --loads and --instances are mandatory.')
print('Some might not be set!')
sys.exit(1)
return args
if os.path.exists("exp_counter.txt"):
with open("exp_counter.txt", mode="r") as read_stream:
exp_id = int(read_stream.read())
else:
exp_id = 0
with open("exp_counter.txt", mode="w") as write_stream:
write_stream.write(str(exp_id+1))
def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ms, duration, domain_restriction, search_strategy, reset, reset_only):
# Store metadata
separator = ","
lines = [
f"UC={uc}\n",
f"DIM_VALUES={separator.join(dim_values)}\n",
f"REPLICAS={separator.join(replicas)}\n",
f"PARTITIONS={partitions}\n",
f"CPU_LIMIT={cpu_limit}\n",
f"MEMORY_LIMIT={memory_limit}\n",
f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={kafka_streams_commit_interval_ms}\n",
f"EXECUTION_MINUTES={execution_minutes}\n",
f"DOMAIN_RESTRICTION={domain_restriction}\n",
f"SEARCH_STRATEGY={search_strategy}"
]
with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
stream.writelines(lines)
print(f"Domain restriction of search space activated: {domain_restriction}")
print(f"Chosen search strategy: {search_strategy}")
# domain restriction
if domain_restriction:
# domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + check-all
if os.path.exists("exp_counter.txt"):
with open("exp_counter.txt", mode="r") as read_stream:
exp_id = int(read_stream.read())
else:
print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction
else:
# no domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(dim_values)*len(replicas)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + check-all
exp_id = 0
# Store metadata
separator = ","
lines = [
f"UC={uc}\n",
f"DIM_VALUES={separator.join(loads)}\n",
f"REPLICAS={separator.join(instances_list)}\n",
f"PARTITIONS={partitions}\n",
f"CPU_LIMIT={cpu_limit}\n",
f"MEMORY_LIMIT={memory_limit}\n",
f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n",
f"EXECUTION_MINUTES={duration}\n",
f"DOMAIN_RESTRICTION={domain_restriction}\n",
f"SEARCH_STRATEGY={search_strategy}"
]
with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
stream.writelines(lines)
with open("exp_counter.txt", mode="w") as write_stream:
write_stream.write(str(exp_id + 1))
# domain restriction
if domain_restriction:
# domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + check_all
else:
print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction
else:
print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=dim_values,
replicass=replicas,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + check_all
else:
print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit,
args.memory_limit, args.commit_ms, args.duration,
args.domain_restriction, args.search_strategy, args.reset,
args.reset_only)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment