import configparser
import csv
import json
import logging
import os
import shutil
from collections import OrderedDict
from decimal import Decimal
from functools import reduce
from operator import add
from operator import mul
from operator import sub
from operator import truediv
from common_func.common import error
from common_func.constant import Constant
from common_func.file_manager import FdOpen
from common_func.file_manager import check_path_valid
from common_func.file_manager import is_link
from common_func.ms_constant.number_constant import NumberConstant
from common_func.ms_constant.str_constant import StrConstant
from common_func.path_manager import PathManager
from common_func.return_code_checker import ReturnCodeCheck
from msconfig.config_manager import ConfigManager
class MsvpCommonConst:
"""
msvp common
"""
TS_CPU = "ts_cpu"
AI_CPU = "ai_cpu"
AI_CORE = "ai_core"
CTRL_CPU = "ctrl_cpu"
CONSTANT = "constant"
CONFIG_PATH = os.path.join(os.path.realpath(os.path.dirname(__file__)), "..", "msconfig")
FILE_NAME = os.path.basename(__file__)
CPU_CONFIG_TYPE = {
TS_CPU: ConfigManager.get("TsCPUConfig"),
AI_CPU: ConfigManager.get("AICPUConfig"),
AI_CORE: ConfigManager.get("AICoreConfig"),
CTRL_CPU: ConfigManager.get("CtrlCPUConfig"),
CONSTANT: ConfigManager.get("ConstantConfig"),
}
@staticmethod
def class_name() -> str:
"""
class name
"""
return "MsvpCommonConst"
def file_name(self: any) -> str:
"""
file name
"""
return self.FILE_NAME
def config_file_obj(file_name: str = "constant") -> any:
"""
:param file_name: config file name
:return: config file object
"""
try:
config = MsvpCommonConst.CPU_CONFIG_TYPE.get(file_name)
return config
except configparser.Error:
return []
finally:
pass
def _get_cpu_metrics(sections: str, cpu_cfg: any) -> dict:
if sections == 'metrics':
metrics_map = cpu_cfg.items(sections)
metrics = OrderedDict(metrics_map)
return metrics
if sections == 'formula':
return OrderedDict(cpu_cfg.items("formula"))
if sections == 'formula_l2':
return OrderedDict(cpu_cfg.items("formula_l2"))
if sections == 'custom':
return OrderedDict(cpu_cfg.items("custom"))
return {}
def read_cpu_cfg(cpu_type: str, sections: str) -> dict:
"""
read cpu configure file
:param cpu_type: cpu type
:param sections: section for cpu
:return:
"""
cpu_cfg = MsvpCommonConst.CPU_CONFIG_TYPE.get(cpu_type)
try:
if sections in ['events', 'event2metric']:
events_map = cpu_cfg.items(sections)
events = {int(k, Constant.HEX_NUMBER): v for k, v in events_map}
return events
return _get_cpu_metrics(sections, cpu_cfg)
except configparser.Error:
return {}
finally:
pass
def get_cpu_event_config(sample_config: dict, cpu_type: str) -> list:
"""
get cpu event from config
:param sample_config: sample config
:param cpu_type: cpu type
:return: cpu events
"""
events = []
for _events in sample_config.get(cpu_type + "_profiling_events").split(","):
if _events != '0x11':
events.append(_events)
cpu_events = []
new_events = (events[i:i + 6] for i in range(0, len(events), 6))
for j in new_events:
cpu_events.append(j)
if cpu_type == 'ai_ctrl_cpu':
cpu_events[0] += ['r11']
else:
cpu_events[0] += ['0x11']
return cpu_events
def get_cpu_event_chunk(sample_config: dict, cpu_type: str) -> list:
"""
get cpu event chunk
:param sample_config: sample config
:param cpu_type: cpu type
:return: cpu events
"""
return get_cpu_event_config(sample_config, cpu_type)
def _do_change_file_mod(file_path: str) -> None:
files_list = []
file_path = os.path.realpath(file_path)
for lists in os.listdir(file_path):
path = os.path.join(file_path, lists)
files_list.append(path)
if not is_link(path) and os.path.isdir(path):
files_chmod(path)
for data_file in files_list:
if not os.path.isdir(data_file):
os.chmod(data_file, NumberConstant.FILE_AUTHORITY)
else:
os.chmod(data_file, NumberConstant.DIR_AUTHORITY)
os.chmod(file_path, NumberConstant.DIR_AUTHORITY)
def files_chmod(file_path: str) -> None:
"""
change the rights of the files
:param file_path: file path
:return: None
"""
try:
if os.path.exists(file_path):
_do_change_file_mod(file_path)
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
error(MsvpCommonConst.FILE_NAME, str(err))
finally:
pass
def create_csv(csv_file: str, headers: list, data: list, save_old_file: bool = True, use_dict=False) -> str:
"""
create csv table
:param csv_file: csv file path
:param headers: data header for csv file
:param data: data for csv file
:param save_old_file: save old file or not
:param use_dict: csv use DictWriter or not
:return: result for creating csv file
"""
if not bak_and_make_dir(csv_file, save_old_file):
return json.dumps({'status': NumberConstant.ERROR, 'info': str('bak or mkdir json dir failed'), 'data': ''})
try:
if use_dict:
create_normal_writer(csv_file, headers, data)
else:
create_csv_writer(csv_file, headers, data)
return json.dumps({'status': NumberConstant.SUCCESS, 'info': '', 'data': csv_file})
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
return json.dumps({'status': NumberConstant.ERROR, 'info': str(err), 'data': ''})
def create_csv_writer(csv_file: str, headers: list, data: list):
with FdOpen(csv_file, newline='') as _csv_file:
writer = csv.writer(_csv_file)
if headers:
writer.writerow(headers)
slice_count = len(data) // NumberConstant.DATA_NUM
for index in range(slice_count):
writer.writerows(data[index * NumberConstant.DATA_NUM:
(index + 1) * NumberConstant.DATA_NUM])
writer.writerows(data[slice_count * NumberConstant.DATA_NUM:])
def create_normal_writer(csv_file: str, headers: list, data: list):
with FdOpen(csv_file) as _csv_file:
if not headers:
return
_csv_file.write(','.join(headers))
slice_count = len(data) // NumberConstant.DATA_NUM
for index in range(slice_count):
_csv_file.writelines(data[index * NumberConstant.DATA_NUM:
(index + 1) * NumberConstant.DATA_NUM])
_csv_file.writelines(data[slice_count * NumberConstant.DATA_NUM:])
def create_json(json_file: str, headers: list, data: list, save_old_file: bool = True) -> str:
"""
create json file
:param json_file: json file name
:param headers: data header for json file
:param data: data for json file
:param save_old_file: save old file or not
:return: result of creating json file
"""
json_result = []
for each in data:
json_result.append(OrderedDict(list(zip(headers, each))))
if not bak_and_make_dir(json_file, save_old_file):
return json.dumps({'status': NumberConstant.ERROR, 'info': str('bak or mkdir csv dir failed'), 'data': ''})
try:
with FdOpen(json_file) as _json_file:
json.dump(json_result, _json_file)
return json.dumps({'status': NumberConstant.SUCCESS, 'info': '', 'data': json_file})
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
return json.dumps({'status': NumberConstant.ERROR, 'info': str(err), 'data': ''})
def create_json_for_dict(json_file: str, dict_result: dict) -> str:
"""
create json file for dict object
:param json_file: json file name
:param dict_result: dict
:return: result of creating json file
"""
if not bak_and_make_dir(json_file, False):
return json.dumps({'status': NumberConstant.ERROR, 'info': str('bak or mkdir csv dir failed'), 'data': ''})
try:
with FdOpen(json_file) as _json_file:
json.dump(dict_result, _json_file)
return json.dumps({'status': NumberConstant.SUCCESS, 'info': '', 'data': json_file})
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
return json.dumps({'status': NumberConstant.ERROR, 'info': str(err), 'data': ''})
def bak_and_make_dir(file: str, save_old_file: bool = True) -> bool:
json_file_back = file + ".bak"
try:
if os.path.exists(file):
if save_old_file:
shutil.move(file, json_file_back)
else:
os.remove(file)
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
logging.error(str(err), exc_info=Constant.TRACE_BACK_SWITCH)
return False
try:
if not os.path.exists(os.path.dirname(file)):
os.makedirs(os.path.dirname(file), Constant.FOLDER_MASK)
return True
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
logging.error(str(err), exc_info=Constant.TRACE_BACK_SWITCH)
return False
def path_check(path: str) -> str:
"""
check existence of path
:param path: file path
:return:
"""
if os.path.exists(path):
return path
return ""
def add_aicore_units(header: list):
"""
add unit for ai core report headers and modify total_time to aicore_time
:param header:
:return: headers
"""
for index, item in enumerate(header):
if item in {"total_time", "aic_total_time"}:
item = "aicore_time"
if item == "aiv_total_time":
item = "aiv_time"
if "time" in item:
item += "(us)"
if StrConstant.BANDWIDTH in item and "(GB/s)" not in item:
item += "(GB/s)"
header[index] = item
def check_dir_writable(path: str, create_dir: bool = False) -> None:
"""
check path is dir and writable
:param path: file path
:param create_dir: create directory or not
:return: None
"""
if not os.path.exists(path) and create_dir:
try:
os.makedirs(path, Constant.FOLDER_MASK)
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
ReturnCodeCheck.print_and_return_status(json.dumps(
{'status': NumberConstant.ERROR,
'info': "Failed to create path '%s'. %s" % (path, err)}))
finally:
pass
check_path_valid(path, False)
if not os.access(path, os.W_OK):
ReturnCodeCheck.print_and_return_status(json.dumps(
{'status': NumberConstant.ERROR,
'info': "The path '%s' does not have permission to write. Please "
'check that the path is writeable.' % path}))
def is_valid_original_data(file_name: str, project_path: str, is_conf: bool = False) -> bool:
"""
is original data
:param file_name: file name
:param project_path: project path
:param is_conf: is config file or not
:return: result of checking original data
"""
file_parent_path = project_path if is_conf else PathManager.get_data_dir(project_path)
if file_name.endswith(Constant.COMPLETE_TAG) or file_name.endswith(Constant.DONE_TAG) \
or file_name.endswith(Constant.ZIP_TAG):
return False
if os.path.exists(os.path.join(file_parent_path, file_name + Constant.COMPLETE_TAG)):
return False
return True
def float_calculate(input_list: list, operator: str = '+') -> str:
"""
float data calculate
:param input_list: data for calculating
:param operator: operator
:return: result after calculated
"""
operator_dict = {
StrConstant.OPERATOR_PLUS: add, StrConstant.OPERATOR_MINUS: sub,
StrConstant.OPERATOR_MULTIPLY: mul, StrConstant.OPERATOR_DIVISOR: truediv
}
if operator not in operator_dict or not input_list or None in input_list:
return str(0)
if operator == StrConstant.OPERATOR_DIVISOR and str(input_list[1]) == "0":
return str(0)
new_input_list = []
for i in input_list:
new_input_list.append(Decimal(str(i)))
input_list = new_input_list
try:
result = reduce(operator_dict.get(operator), input_list)
result = result.quantize(Decimal('0.000'))
return str(result)
except (OSError, SystemError, ValueError, TypeError, RuntimeError, ArithmeticError):
return str(0)
def is_number(float_num: any) -> bool:
"""
check whether s is number or not
:param float_num: number to check
:return: is number or not
"""
try:
float(float_num)
return True
except (ValueError, TypeError):
return False
finally:
pass
def is_nonzero_number(float_num: any) -> bool:
"""
check whether s is zero number or not
:param float_num: number to check
:return: is zero number or not
"""
return is_number(float_num) and not NumberConstant.is_zero(float(float_num))
def clear_project_dirs(project_dir: str) -> None:
"""
remove sqlite data and complete data
project_dir : result path
"""
if os.path.exists(PathManager.get_sql_dir(project_dir)):
for file_name in os.listdir(PathManager.get_sql_dir(project_dir)):
os.remove(os.path.join(PathManager.get_sql_dir(project_dir), file_name))
for file_name in os.listdir(PathManager.get_data_dir(project_dir)):
if file_name.endswith(Constant.COMPLETE_TAG):
os.remove(os.path.join(PathManager.get_data_dir(project_dir), file_name))
def format_high_precision_for_csv(data: str) -> str:
return data + '\t'