Custom Analysis Rule Development Guide
Overview
Custom analysis rules are developed based on the analysis of profile data within analysis.db and ascend_pytorch_profiler_{rank_id}.db files. Similar to the implementation of parameters such as cann_api_sum, compute_op_sum, and hccl_sum, these rules allow you to define custom logic for profile data analysis.
Procedure for Developing Custom Analysis Rules
Perform the following steps:
-
Create an
xxxdirectory and anxxx.pyfile inmsprof_analyze/cluster_analyse/recipesof themsprof-analyzecode repository.Here is an example:
msprof_analyze/cluster_analyse/recipes/cann_api_sum/cann_api_sum.py. The directory name must match the file name. This directory name also serves as the command-line flag to enable the custom analysis rule when you run themsprof-analyzecluster tool. -
Develop profile data analysis rules in the
xxx.pyfile by inheriting from theBaseRecipeAnalysisclass and implementing therunfunction.The typical implementation of the
runfunction is as follows:def run(self, context): mapper_res = self.mapper_func(context) self.reducer_func(mapper_res) if self._export_type == "db": self.save_db() elif self._export_type == "notebook": self.save_notebook() else: logger.error("Unknown export type.")-
mapper_funcfunction: queries multi-rank data and merges the results. Since data processing for each rank in a cluster is identical, the context is used to process profile data in parallel and assemble the results in sequence. You only need to implement theself._mapper_funcfunction for single-rank data data processing.def mapper_func(self, context): return context.wait( context.map( self._mapper_func, self._get_rank_db(), analysis_class=self._recipe_name ) )def _mapper_func(self, data_map, analysis_class): """ Extract the profiling data required for cluster analysis from each device, and then aggregate the results from each device to be processed by a reduce function. Params: data_map: eg. {"RANK_ID": 1, "profiler_db_path": "xxxx/ascend_pytorch_profiler_1.db"} analysis_class: hccl_sum, compute_op_sum, cann_api_sum, mstx_sum...... """ pass -
reducer_func: analyzes and processes multi-rank results. This function receives the return values frommapper_funcand summarizes the cluster data into aDataFrame. -
save_db: saves analysis results incluster_analysis.db. -
save_notebook: saves analysis results in CSV andstats.ipynbformats.
-
-
The
self._mapper_funcfunction relies on querying data from a single database. You can use either of the following two methods:-
Use
DatabaseServiceto configure single-table queries.For details, see mstx2commop.py.
Example:
service = DatabaseService(profiler_db_path) service.add_table_for_query("ENUM_HCCL_DATA_TYPE", ["id", "name"]) # First argument: table name; second argument: field list. By default, no field is specified and SELECT * is executed to query all fields. service.add_table_for_query("STRING_IDS", ["id", "value"]) # Multiple tables can be added. df_dict = service.query_data() # Queries all specified tables in sequence and returns a dict (key: table name; value: DataFrame-type query result). -
Create a new .py file in the
msprof_analyze/prof_exportsdirectory. The class must inherit fromBaseStatsExport. To avoid duplication, check if an existing file meets your needs before creating a new one. The sample code is as follows:from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport QUERY = """ SELECT NAME_IDS.value AS "OpName", TYPE_IDS.value AS "OpType", round(endNs - startNs) AS "Duration", GROUP_NAME_IDS.value AS "GroupName" FROM COMMUNICATION_OP LEFT JOIN STRING_IDS AS TYPE_IDS ON TYPE_IDS.id = COMMUNICATION_OP.opType LEFT JOIN STRING_IDS AS NAME_IDS ON NAME_IDS.id = COMMUNICATION_OP.opName LEFT JOIN STRING_IDS AS GROUP_NAME_IDS ON GROUP_NAME_IDS.id = COMMUNICATION_OP.groupName """ class HcclSumExport(BaseStatsExport): def __init__(self, db_path, recipe_name): super().__init__(db_path, recipe_name) self._query = QUERYExample:
df = HcclSumExport(profiler_db_path, analysis_class).read_export_db(). The returned data type isDataFrame.
-
-
Add extended parameters to analysis rules.
Implement the
add_parser_argumentfunction. Example:@classmethod def add_parser_argument(cls, parser): parser.add_argument("--top_num", type=str, help="Duration cost top count", default=cls.DEFAULT_TOP_NUM)Obtain the corresponding extended parameters from
self._extra_args:def __init__(self, params): super().__init__(params) top_num = self._extra_args.get(self.TOP_NUM, self.DEFAULT_TOP_NUM) self.top_num = int(top_num) if isinstance(top_num, str) and top_num.isdigit() else self.DEFAULT_TOP_NUM -
Run the custom analysis rule command.
msprof-analyze cluster -d {cluster profiling data path} --mode xxx --top_num 10