Bbaishanyanginit project
5f1c8c3b创建于 4 天前历史提交
import re
import json
from typing import Dict, List, Optional
from pathlib import Path

CONFIG_PATH = Path(__file__).parent.parent / "config" / "operator_replacements.json"

def load_operator_config() -> Dict:
    if CONFIG_PATH.exists():
        with open(CONFIG_PATH, "r", encoding="utf-8") as f:
            return json.load(f)
    return {}

RUNTIME_ERROR_PATTERNS = {
    "shape_mismatch": {
        "patterns": [
            r"shape\s*mismatch",
            r"dimension\s*mismatch",
            r"size\s*mismatch",
            r"expected\s*shape.*got",
            r"tensor\s*shape.*does\s*not\s*match",
            r"shape.*expected.*got",
            r"input\s*shape.*invalid",
            r"dimension\s*\d+\s*mismatch"
        ],
        "diagnosis": "输入tensor维度与模型期望不匹配",
        "suggestions": [
            "检查输入数据的shape是否与模型定义一致",
            "使用benchmark的--inputShape参数指定正确维度",
            "重新转换模型时使用converter_lite的--inputShape参数固化shape"
        ]
    },
    "dtype_error": {
        "patterns": [
            r"dtype\s*error",
            r"data\s*type\s*mismatch",
            r"type\s*mismatch",
            r"dtype\s*not\s*supported",
            r"float16\s*error",
            r"int8\s*error",
            r"expected\s*dtype.*got",
            r"unsupported\s*data\s*type"
        ],
        "diagnosis": "输入/输出数据类型与模型定义不符",
        "suggestions": [
            "使用converter_lite的--inputDataType/--outputDataType指定数据类型",
            "benchmark时使用--benchmarkDataType参数指定输入数据类型",
            "检查是否需要启用--enableFp16进行FP16推理"
        ]
    },
    "memory_error": {
        "patterns": [
            r"OOM",
            r"out\s*of\s*memory",
            r"allocate\s*failed",
            r"memory\s*allocation\s*error",
            r"memory\s*error",
            r"cannot\s*allocate",
            r"insufficient\s*memory",
            r"memory\s*exhausted"
        ],
        "diagnosis": "内存不足导致推理失败",
        "suggestions": [
            "启用INT8量化重新转换模型以减小模型大小",
            "减小输入数据的batch size",
            "使用converter_lite的--fp16=on减小模型体积",
            "切换到CPU运行避免GPU显存限制"
        ]
    },
    "unsupported_op_cpu": {
        "patterns": [
            r"op\s*not\s*supported.*CPU",
            r"CPU.*unsupported\s*operator",
            r"operator\s*not\s*implemented.*CPU",
            r"kernel\s*not\s*found.*CPU",
            r"no\s*kernel\s*for\s*op.*CPU",
            r"CPU.*算子.*不支持",
            r"算子.*CPU.*不支持",
            r"Expand.*not\s*supported",
            r"Expand.*kernel\s*not\s*found"
        ],
        "diagnosis": "算子在CPU设备上未实现",
        "suggestions": [
            "尝试使用GPU或NPU设备运行: --device=GPU 或 --device=NPU",
            "如果算子可替换,使用ohos_runtime_op_fix修复ONNX模型",
            "查看MindSpore Lite算子支持列表确认CPU支持情况"
        ],
        "device": "CPU"
    },
    "unsupported_op_npu": {
        "patterns": [
            r"op\s*not\s*supported.*NPU",
            r"NPU.*unsupported\s*operator",
            r"operator\s*not\s*implemented.*NPU",
            r"kernel\s*not\s*found.*NPU",
            r"no\s*kernel\s*for\s*op.*NPU",
            r"NPU.*算子.*不支持",
            r"算子.*NPU.*不支持",
            r"Split.*not\s*supported.*NPU",
            r"Split.*kernel\s*not\s*found",
            r"Mod.*not\s*supported",
            r"TopK.*not\s*supported.*NPU",
            r"OneHot.*not\s*supported.*NPU"
        ],
        "diagnosis": "算子在NPU设备上未实现",
        "suggestions": [
            "尝试使用CPU或GPU设备运行: --device=CPU 或 --device=GPU",
            "使用ohos_runtime_op_fix工具修复ONNX模型中不支持的算子",
            "查看MindSpore Lite算子支持列表确认NPU支持情况"
        ],
        "device": "NPU"
    },
    "unsupported_op_gpu": {
        "patterns": [
            r"op\s*not\s*supported.*GPU",
            r"GPU.*unsupported\s*operator",
            r"operator\s*not\s*implemented.*GPU",
            r"kernel\s*not\s*found.*GPU",
            r"no\s*kernel\s*for\s*op.*GPU",
            r"OpenCL.*error",
            r"GPU.*算子.*不支持"
        ],
        "diagnosis": "算子在GPU设备上未实现",
        "suggestions": [
            "尝试使用CPU或NPU设备运行: --device=CPU 或 --device=NPU",
            "如果算子可替换,使用ohos_runtime_op_fix修复ONNX模型",
            "查看MindSpore Lite算子支持列表确认GPU支持情况"
        ],
        "device": "GPU"
    },
    "unsupported_op_runtime": {
        "patterns": [
            r"op\s*not\s*supported",
            r"unsupported\s*operator",
            r"operator\s*not\s*implemented",
            r"operator\s*failed",
            r"算子.*不支持",
            r"算子.*未实现",
            r"kernel\s*not\s*found",
            r"no\s*kernel\s*for\s*op"
        ],
        "diagnosis": "算子在目标设备上未实现",
        "suggestions": [
            "查看MindSpore Lite算子支持列表确认算子是否支持",
            "切换设备(CPU/GPU/NPU)尝试不同实现",
            "如果问题算子可替换,使用ohos_runtime_op_fix修改ONNX模型"
        ]
    },
    "device_error": {
        "patterns": [
            r"GPU\s*not\s*available",
            r"NPU\s*error",
            r"OpenCL\s*error",
            r"device\s*not\s*found",
            r"device\s*failed",
            r"driver\s*not\s*initialized",
            r"CUDA\s*error",
            r"硬件.*错误"
        ],
        "diagnosis": "目标设备不可用或驱动问题",
        "suggestions": [
            "检查目标设备(GPU/NPU)的驱动是否正确安装",
            "切换到CPU设备运行: --device=CPU",
            "检查设备环境配置是否正确"
        ]
    },
    "input_error": {
        "patterns": [
            r"input\s*error",
            r"input\s*file\s*not\s*found",
            r"invalid\s*input",
            r"input\s*tensor\s*error",
            r"cannot\s*read\s*input",
            r"输入.*错误",
            r"输入.*文件.*不存在",
            r"input\s*data\s*invalid"
        ],
        "diagnosis": "输入数据文件不存在或格式错误",
        "suggestions": [
            "检查benchmark的--inDataFile参数指定的路径是否正确",
            "验证输入数据的格式是否符合模型要求",
            "使用随机输入测试模型是否正常工作"
        ]
    },
    "model_load_error": {
        "patterns": [
            r"model\s*load\s*failed",
            r"model\s*invalid",
            r"parse\s*error",
            r"file\s*corrupted",
            r"模型.*加载.*失败",
            r"模型.*无效",
            r"cannot\s*parse\s*model",
            r"model\s*file\s*error"
        ],
        "diagnosis": ".ms文件损坏或格式错误",
        "suggestions": [
            "检查.ms文件是否完整,大小是否正常",
            "重新执行ONNX到.ms的转换流程",
            "检查转换时的参数配置是否正确"
        ]
    },
    "quant_accuracy_error": {
        "patterns": [
            r"accuracy\s*threshold",
            r"cosine\s*distance",
            r"precision\s*loss",
            r"accuracy\s*error",
            r"量化.*精度",
            r"精度.*下降",
            r"accuracy\s*below\s*threshold",
            r"verification\s*failed"
        ],
        "diagnosis": "INT8量化后精度不达标",
        "suggestions": [
            "检查量化校准数据集是否合理且覆盖典型输入",
            "考虑使用混合精度量化或部分量化",
            "不使用量化重新转换模型对比精度差异"
        ]
    },
    "negative_dimension": {
        "patterns": [
            r"shape of tensor contains negative dimension",
            r"negative dimension",
            r"dimension.*-1",
            r"CheckTensorsInvalid",
            r"assign the input shape",
            r"Resize\(\)"
        ],
        "diagnosis": "模型包含动态shape维度,benchmark时未指定输入尺寸",
        "suggestions": [
            "使用ohos_model_info工具检查模型输入shape定义",
            "在benchmark时使用--inputShape参数指定输入维度,格式:input:1,3,224,224",
            "重新转换模型时使用converter_lite的--inputShape参数固化shape"
        ]
    }
}

def extract_operator_from_error(error_log: str) -> Optional[str]:
    patterns = [
        r"operator\s*['\"]?(\w+)['\"]?\s*is\s*not\s*(?:implemented|supported)",
        r"op\s*['\"]?(\w+)['\"]?\s*not\s*supported",
        r"unsupported\s*operator[:\s]*['\"]?(\w+)['\"]?",
        r"kernel\s*not\s*found\s*for[:\s]*['\"]?(\w+)['\"]?",
        r"no\s*kernel\s*for\s*op[:\s]*['\"]?(\w+)['\"]?",
        r"算子\s*['\"]?(\w+)['\"]?\s*不支持",
        r"['\"]?(\w+)['\"]?\s*算子.*不支持"
    ]
    for pattern in patterns:
        match = re.search(pattern, error_log, re.IGNORECASE)
        if match:
            return match.group(1)
    return None

def detect_device_from_error(error_log: str) -> Optional[str]:
    error_lower = error_log.lower()
    if re.search(r'\bcpu\b', error_lower) or 'cpu' in error_lower:
        if re.search(r'op.*not\s*supported.*cpu|cpu.*op.*not\s*supported|kernel.*not\s*found.*cpu', error_lower):
            return "CPU"
    if re.search(r'\bnpu\b', error_lower) or 'npu' in error_lower:
        if re.search(r'op.*not\s*supported.*npu|npu.*op.*not\s*supported|kernel.*not\s*found.*npu', error_lower):
            return "NPU"
    if re.search(r'\bgpu\b', error_lower) or 'gpu' in error_lower or 'opencl' in error_lower:
        if re.search(r'op.*not\s*supported.*gpu|gpu.*op.*not\s*supported|kernel.*not\s*found.*gpu', error_lower):
            return "GPU"
    return None

def get_replacement_info(operator: str, device: str) -> Optional[Dict]:
    config = load_operator_config()
    if operator in config:
        op_config = config[operator]
        support_key = f"{device.lower()}_supported"
        if not op_config.get(support_key, True):
            return {
                "operator": operator,
                "device": device,
                "strategy": op_config.get("strategy"),
                "replacement": op_config.get("replacement"),
                "description": op_config.get("description"),
                "has_fix": op_config.get("strategy") is not None
            }
    return None

def analyze_runtime_error(error_log: str, device: str = None) -> Dict:
    matched_errors = []
    detected_device = device or detect_device_from_error(error_log)
    detected_operator = None
    
    for error_type, config in RUNTIME_ERROR_PATTERNS.items():
        matched_patterns = []
        for pattern in config["patterns"]:
            if re.search(pattern, error_log, re.IGNORECASE):
                matched_patterns.append(pattern)
        
        if matched_patterns:
            confidence = len(matched_patterns) / len(config["patterns"])
            error_info = {
                "error_type": error_type,
                "diagnosis": config["diagnosis"],
                "suggestions": config["suggestions"],
                "matched_patterns": matched_patterns,
                "confidence": round(confidence, 2)
            }
            
            if "device" in config:
                error_info["device"] = config["device"]
            
            if error_type.startswith("unsupported_op") and not error_type.endswith("_runtime"):
                detected_operator = extract_operator_from_error(error_log)
                if detected_operator:
                    error_info["operator"] = detected_operator
                    
                    target_device = detected_device or config.get("device", "NPU")
                    replacement_info = get_replacement_info(detected_operator, target_device)
                    if replacement_info:
                        error_info["replacement"] = replacement_info
                        if replacement_info["has_fix"]:
                            error_info["suggestions"] = [
                                f"使用 ohos_runtime_op_fix 工具替换 {detected_operator} 算子",
                                f"替换策略: {replacement_info['description']}"
                            ] + error_info["suggestions"]
            
            matched_errors.append(error_info)
    
    matched_errors.sort(key=lambda x: x["confidence"], reverse=True)
    
    result = {
        "matched_errors": matched_errors,
        "total_matches": len(matched_errors),
        "has_errors": len(matched_errors) > 0
    }
    
    if detected_device:
        result["detected_device"] = detected_device
    if detected_operator:
        result["detected_operator"] = detected_operator
    
    return result

def load_error_example(error_type: str) -> Dict:
    """
    加载指定错误类型的示例JSON文件
    
    Args:
        error_type: 错误类型名称
        
    Returns:
        示例JSON内容,如果文件不存在则返回None
    """
    examples_dir = Path(__file__).parent.parent / "examples"
    example_file = examples_dir / f"{error_type}.json"
    
    if example_file.exists():
        with open(example_file, "r", encoding="utf-8") as f:
            return json.load(f)
    
    return None

def get_all_error_types() -> List[str]:
    """
    返回所有支持的错误类型列表
    
    Returns:
        错误类型名称列表
    """
    return list(RUNTIME_ERROR_PATTERNS.keys())

def get_error_info(error_type: str) -> Dict:
    """
    获取指定错误类型的详细信息
    
    Args:
        error_type: 错误类型名称
        
    Returns:
        错误类型配置信息,如果不存在则返回None
    """
    if error_type in RUNTIME_ERROR_PATTERNS:
        return RUNTIME_ERROR_PATTERNS[error_type]
    return None

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) < 2:
        print("Usage: python runtime_error_analyzer.py <error_log>")
        print("\nSupported error types:")
        for error_type in get_all_error_types():
            info = get_error_info(error_type)
            print(f"  - {error_type}: {info['diagnosis']}")
        sys.exit(1)
    
    error_log = sys.argv[1]
    
    if Path(error_log).exists():
        with open(error_log, "r", encoding="utf-8") as f:
            error_log = f.read()
    
    result = analyze_runtime_error(error_log)
    
    print("=" * 60)
    print("Runtime Error Analysis Result")
    print("=" * 60)
    
    if result["has_errors"]:
        print(f"\nTotal matched error types: {result['total_matches']}")
        
        for error in result["matched_errors"]:
            print(f"\n[{error['error_type']}]")
            print(f"  Diagnosis: {error['diagnosis']}")
            print(f"  Confidence: {error['confidence']}")
            print(f"  Matched patterns: {error['matched_patterns']}")
            print(f"  Suggestions:")
            for i, suggestion in enumerate(error["suggestions"], 1):
                print(f"    {i}. {suggestion}")
    else:
        print("\nNo recognized error patterns found in the log.")
        print("Please check if the error log is complete.")
    
    print("=" * 60)