Skip to content
Snippets Groups Projects
Select Git revision
  • 0b442a468607fce2abbeab36d24add7c3a27557d
  • master default
  • 1.12
  • 1.11
  • 1.10
  • 1.9
  • 1.8
  • 1.7
  • 1.6-beta
9 results

main.xhtml

Blame
  • main.py 1.86 KiB
    from fastapi import FastAPI,Request
    import trend_slope_computer as trend_slope_computer
    import logging
    import os
    import pandas as pd
    import json
    import sys
    from statistics import median
    
    app = FastAPI()
    
    logging.basicConfig(stream=sys.stdout,
                        format="%(asctime)s %(levelname)s %(name)s: %(message)s")
    logger = logging.getLogger("API")
    
    
    if os.getenv('LOG_LEVEL') == 'INFO':
        logger.setLevel(logging.INFO)
    elif os.getenv('LOG_LEVEL') == 'WARNING':
        logger.setLevel(logging.WARNING)
    elif os.getenv('LOG_LEVEL') == 'DEBUG':
        logger.setLevel(logging.DEBUG)
    
    def calculate_slope_trend(results, warmup):
        d = []
        for result in results:
            group = result['metric'].get('consumergroup', "default")
            for value in result['values']:
                d.append({'group': group, 'timestamp': int(
                    value[0]), 'value': float(value[1]) if value[1] != 'NaN' else 0})
    
        df = pd.DataFrame(d)
    
        logger.info("Calculating trend slope with warmup of %s seconds for data frame:\n %s", warmup, df)
        try:
            trend_slope = trend_slope_computer.compute(df, warmup)
        except Exception as e:
            err_msg = 'Computing trend slope failed.'
            logger.exception(err_msg)
            logger.error('Mark this subexperiment as not successful and continue benchmark.')
            return float('inf')
    
        logger.info("Computed lag trend slope is '%s'", trend_slope)
        return trend_slope
    
    def check_service_level_objective(results, threshold):
        return median(results) < threshold
    
    @app.post("/evaluate-slope",response_model=bool)
    async def evaluate_slope(request: Request):
        data = json.loads(await request.body())
        results = [calculate_slope_trend(total_lag, data['metadata']['warmup']) for total_lag in data['results']]
        return check_service_level_objective(results=results, threshold=data['metadata']["threshold"])
    
    logger.info("SLO evaluator is online")