Skip to content
Snippets Groups Projects
Select Git revision
  • 03fb239c25e155e8ef41df7381ffde6c1b24982e
  • main default protected
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • delete-action-for-other-namespace
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
41 results

main.py

Blame
  • main.py 1.86 KiB
    from fastapi import FastAPI,Request
    import trend_slope_computer as trend_slope_computer
    import logging
    import os
    import pandas as pd
    import json
    import sys
    from statistics import median
    
    app = FastAPI()
    
    logging.basicConfig(stream=sys.stdout,
                        format="%(asctime)s %(levelname)s %(name)s: %(message)s")
    logger = logging.getLogger("API")
    
    
    if os.getenv('LOG_LEVEL') == 'INFO':
        logger.setLevel(logging.INFO)
    elif os.getenv('LOG_LEVEL') == 'WARNING':
        logger.setLevel(logging.WARNING)
    elif os.getenv('LOG_LEVEL') == 'DEBUG':
        logger.setLevel(logging.DEBUG)
    
    def calculate_slope_trend(results, warmup):
        d = []
        for result in results:
            group = result['metric'].get('consumergroup', "default")
            for value in result['values']:
                d.append({'group': group, 'timestamp': int(
                    value[0]), 'value': float(value[1]) if value[1] != 'NaN' else 0})
    
        df = pd.DataFrame(d)
    
        logger.info("Calculating trend slope with warmup of %s seconds for data frame:\n %s", warmup, df)
        try:
            trend_slope = trend_slope_computer.compute(df, warmup)
        except Exception as e:
            err_msg = 'Computing trend slope failed.'
            logger.exception(err_msg)
            logger.error('Mark this subexperiment as not successful and continue benchmark.')
            return float('inf')
    
        logger.info("Computed lag trend slope is '%s'", trend_slope)
        return trend_slope
    
    def check_service_level_objective(results, threshold):
        return median(results) < threshold
    
    @app.post("/evaluate-slope",response_model=bool)
    async def evaluate_slope(request: Request):
        data = json.loads(await request.body())
        results = [calculate_slope_trend(total_lag, data['metadata']['warmup']) for total_lag in data['results']]
        return check_service_level_objective(results=results, threshold=data['metadata']["threshold"])
    
    logger.info("SLO evaluator is online")