Skip to content
Snippets Groups Projects
Commit fbe13d8b authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Add support for benchmarkin strategies

parent 6a3ecbe5
No related branches found
No related tags found
4 merge requests!39Add Support for Benchmarking Strategies,!36Implement Benchmarking Strategy: Heuristic 2 (Binary Search Strategy),!35Implement Benchmarking Strategy: Heuristic 1 (Step Strategy),!26Add Support for Benchmarking Strategies
#!/bin/bash
#!/usr/bin/env python
UC=$1
IFS=', ' read -r -a DIM_VALUES <<< "$2"
IFS=', ' read -r -a REPLICAS <<< "$3"
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
import sys
import os
from strategies.config import ExperimentConfig
import strategies.strategies.default_strategy as default_strategy
from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
# Get and increment counter
EXP_ID=$(cat exp_counter.txt 2>/dev/null || echo "0")
echo $((EXP_ID+1)) > exp_counter.txt
uc=sys.argv[1]
dim_values=sys.argv[2].split(',')
replicas=sys.argv[3].split(',')
partitions=sys.argv[4] if len(sys.argv) >= 5 and sys.argv[4] else 40
cpu_limit=sys.argv[5] if len(sys.argv) >= 6 and sys.argv[5] else "1000m"
memory_limit=sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else "4Gi"
kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[7] else 100
execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5
benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default"
# Store meta information
IFS=$', '; echo \
"UC=$UC
DIM_VALUES=${DIM_VALUES[*]}
REPLICAS=${REPLICAS[*]}
PARTITIONS=$PARTITIONS
CPU_LIMIT=$CPU_LIMIT
MEMORY_LIMIT=$MEMORY_LIMIT
KAFKA_STREAMS_COMMIT_INTERVAL_MS=$KAFKA_STREAMS_COMMIT_INTERVAL_MS
EXECUTION_MINUTES=$EXECUTION_MINUTES
" >> "exp${EXP_ID}_uc${UC}_meta.txt"
print("Chosen benchmarking strategy: "+benchmark_strategy)
print("Going to execute " + str(len(dim_values)*len(replicas)) + " subexperiments in total..")
SUBEXPERIMENTS=$((${#DIM_VALUES[@]} * ${#REPLICAS[@]}))
SUBEXPERIMENT_COUNTER=0
echo "Going to execute $SUBEXPERIMENTS subexperiments in total..."
for DIM_VALUE in "${DIM_VALUES[@]}"
do
for REPLICA in "${REPLICAS[@]}"
do
SUBEXPERIMENT_COUNTER=$((SUBEXPERIMENT_COUNTER+1))
echo "Run subexperiment $SUBEXPERIMENT_COUNTER/$SUBEXPERIMENTS with config: $DIM_VALUE $REPLICA"
./run_uc$UC-new.sh $EXP_ID $DIM_VALUE $REPLICA $PARTITIONS $CPU_LIMIT $MEMORY_LIMIT $KAFKA_STREAMS_COMMIT_INTERVAL_MS $EXECUTION_MINUTES
sleep 10s
done
done
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor)
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
*.pyc
\ No newline at end of file
class ExperimentConfig:
""" Wrapper for the configuration of an experiment. """
def __init__(self, use_case, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, benchmarking_strategy, subexperiment_executor):
self.use_case=use_case
self.dim_values=dim_values
self.replicas=replicas
self.partitions=partitions
self.cpu_limit=cpu_limit
self.memory_limit=memory_limit
self.kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms
self.execution_minutes=execution_minutes
self.benchmarking_strategy=benchmarking_strategy
self.subexperiment_executor=subexperiment_executor
\ No newline at end of file
class ExperimentExecutor:
def __init__(self, config):
self.config=config
def execute(self):
self.config.benchmarking_strategy.execute(self.config)
class SubexperimentConfig:
""" Wrapper for the configuration of a subexperiment """
def __init__(self, use_case, counter, dim_value, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, subexperiment_executor):
self.use_case=use_case
self.counter=counter
self.dim_value=dim_value
self.replicas=replicas
self.partitions=partitions
self.cpu_limit=cpu_limit
self.memory_limit=memory_limit
self.kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms
self.execution_minutes=execution_minutes
self.subexperiment_executor=subexperiment_executor
\ No newline at end of file
# Contains the default strategy that executes a subexperiment for all combinations of instances and dimension values.
import os
from config import SubexperimentConfig
def execute(config):
subexperiment_counter=0
subexperiments_total=len(config.dim_values)*len(config.replicas)
for dim_value in config.dim_values:
for replica in config.replicas:
subexperiment_counter+=1
print("Run subexperiment " + str(subexperiment_counter) + "/" + str(subexperiments_total) + " with config " + str(dim_value) + " " + str(replica))
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, replica, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.subexperiment_executor)
config.subexperiment_executor.execute(subexperiment_config)
# Wrapper that makes the execution method of a subexperiment interchangable.
import os
dirname = os.path.dirname(__file__)
os.chdir(dirname+"/../../")
def execute(subexperiment_config):
os.system("./run_uc"+subexperiment_config.use_case+"-new.sh "+str(subexperiment_config.counter)+" "+str(subexperiment_config.dim_value)+" "+str(subexperiment_config.replicas)+" "+str(subexperiment_config.partitions)+" "+subexperiment_config.cpu_limit+" "+subexperiment_config.memory_limit+" "+str(subexperiment_config.kafka_streams_commit_interval_ms)+" "+str(subexperiment_config.execution_minutes))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment