import logging
import sqlite3
from common_func.constant import Constant
from common_func.hash_dict_constant import HashDictData
from common_func.ms_constant.str_constant import StrConstant
from common_func.ms_multi_process import MsMultiProcess
from msmodel.add_info.tensor_add_info_model import TensorAddInfoModel
from msparser.add_info.tensor_add_info_bean import TensorAddInfoBean
from msparser.data_struct_size_constant import StructFmt
from msparser.interface.data_parser import DataParser
from profiling_bean.prof_enum.data_tag import DataTag
class TensorAddInfoParser(DataParser, MsMultiProcess):
"""
ge tensor info data parser
"""
def __init__(self: any, file_list: dict, sample_config: dict) -> None:
super().__init__(sample_config)
super(DataParser, self).__init__(sample_config)
self._sample_config = sample_config
self._file_list = file_list
self._ge_tensor_info_data = []
self._project_path = sample_config.get(StrConstant.SAMPLE_CONFIG_PROJECT_PATH)
@staticmethod
def _get_tensor_info_data(bean_data: any) -> list:
if not bean_data:
return []
return [bean_data.level, bean_data.struct_type, bean_data.thread_id, bean_data.timestamp,
bean_data.node_id, bean_data.tensor_num,
bean_data.input_format, bean_data.input_data_type, bean_data.input_shape,
bean_data.output_format, bean_data.output_data_type, bean_data.output_shape]
@classmethod
def _generate_new_hash_dict_data(cls: any, hash_dict: dict, key: any, data: list) -> None:
hash_dict[key] = {
'level': data[0],
'add_info_type': data[1],
'thread_id': data[2],
'timestamp': data[3],
'node_id': data[4],
'tensor_num': 0,
'input_format': [],
'input_data_type': [],
'input_shape': [],
'output_format': [],
'output_data_type': [],
'output_shape': [],
}
cls._update_hash_dict_data(hash_dict, key, data)
@classmethod
def _update_hash_dict_data(cls: any, hash_dict: dict, key: any, data: list) -> None:
value = hash_dict.get(key, {})
value['tensor_num'] += data[5]
if data[6] or data[7] or data[8]:
value['input_format'].append(str(data[6]) if data[6] else '')
value['input_data_type'].append(str(data[7]) if data[7] else '')
value['input_shape'].append(str(data[8]) if data[8] else '')
if data[9] or data[10] or data[11]:
value['output_format'].append(str(data[9]) if data[9] else '')
value['output_data_type'].append(str(data[10]) if data[10] else '')
value['output_shape'].append(str(data[11]) if data[11] else '')
hash_dict[key] = value
def parse(self: any) -> None:
"""
parse ge tensor info data
"""
tensor_info_files = self._file_list.get(DataTag.TENSOR_ADD_INFO, [])
tensor_info_files = self.group_aging_file(tensor_info_files)
for file_list in tensor_info_files.values():
self._ge_tensor_info_data.extend(self.parse_bean_data(file_list, StructFmt.TENSOR_ADD_INFO_SIZE,
TensorAddInfoBean,
format_func=self._get_tensor_info_data,
check_func=self.check_magic_num,
))
self._update_tensor_data()
def save(self: any) -> None:
"""
save data
"""
if not self._ge_tensor_info_data:
return
model = TensorAddInfoModel(self._project_path)
with model as _model:
_model.flush(self._transform_tensor_info_data(self._ge_tensor_info_data))
def ms_run(self: any) -> None:
"""
parse and save ge tensor info data
:return:
"""
if not self._file_list.get(DataTag.TENSOR_ADD_INFO, []):
logging.info("No ge tensor info data, exit without parser running.")
return
try:
self.parse()
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
logging.error(str(err), exc_info=Constant.TRACE_BACK_SWITCH)
return
try:
self.save()
except sqlite3.Error as err:
logging.error(str(err), exc_info=Constant.TRACE_BACK_SWITCH)
def _update_tensor_data(self: any) -> None:
hash_dict = {}
for data in self._ge_tensor_info_data:
if len(data) < 12:
continue
key = (str(data[3]), str(data[2]), str(data[4]))
if key not in hash_dict.keys():
self._generate_new_hash_dict_data(hash_dict, key, data)
else:
self._update_hash_dict_data(hash_dict, key, data)
self._assemble_tensor_data(hash_dict)
def _assemble_tensor_data(self: any, hash_dict: dict) -> None:
self._ge_tensor_info_data = []
for key, value in hash_dict.items():
timestamp, thread_id, node_id = key
self._ge_tensor_info_data.append(
[
value['level'], value['add_info_type'], thread_id, timestamp, node_id, value['tensor_num'],
";".join(value['input_format']) if value['input_format'] else "N/A",
";".join(value['input_data_type']) if value['input_data_type'] else "N/A",
"\"" + ";".join(value['input_shape']) + "\"" if value['input_shape'] else "N/A",
";".join(value['output_format']) if value['output_format'] else "N/A",
";".join(value['output_data_type']) if value['output_data_type'] else "N/A",
"\"" + ";".join(value['output_shape']) + "\"" if value['output_shape'] else "N/A"
]
)
def _transform_tensor_info_data(self: any, data_list: list) -> list:
hash_dict_data = HashDictData(self._project_path)
type_hash_dict = hash_dict_data.get_type_hash_dict()
ge_hash_dict = hash_dict_data.get_ge_hash_dict()
for data in data_list:
data[1] = type_hash_dict.get('node', {}).get(data[1], data[1])
data[4] = ge_hash_dict.get(data[4], data[4])
return data_list