| [fix] Add FaultyTensorpipeAgent For TestCase
Co-authored-by: pengqi<pengqi33@huawei.com>
# message auto-generated for no-merge-commit merge:
!36104 merge v2.7.0_faultytensor into v2.7.1
[fix] Add FaultyTensorpipeAgent For TestCase
Created-by: pengqihw
Commit-by: pengqihw;pengqi
Merged-by: ascend-robot
Description: <!--
PR描述模板更新日期:20260203
-->
# 【合入来源】
> <font color="red">**如有社区issue,请关联issue链接**</font>\
> <font color="red">**请勿携带内部流程信息(需求链接、问题单、内部issue等)**</font>
用例中原生社区使用了FaultyTensorPipeAgent类进行延时注入,NPU 缺少 FaultyTensorPipeAgent,导致无法使用 FAULTY_TENSORPIPE 后端,messages_to_delay / messages_to_fail 配置无效。本次改动为 NPU 实现完整的 FaultyTensorPipeAgent 基础设施。
- [ ] 需求
- [ ] 问题单
- [x] issue/工单
- [ ] 重构优化
- [ ] 资料更新
# 【修改方案】
> 请描述修改内容的具体实现,涉及哪些组件之间进行交互,可以用1、2、3、...进行罗列\
> 如果是需求或者重构类的PR,需要补充详细设计文档(说明上下游组件关系、时序图、类图、DFX能力等内容)
参考原生社区实现为 NPU 添加 FaultyTensorPipeAgent 实现,使 FAULTY_TENSORPIPE 后端能在 NPU 上用于 RPC 故障注入测试。
## 修改范围(7 个文件)
### 新增(2 个)
faulty_tensorpipe_agent.h — FaultyTensorPipeAgent 类定义
- 继承 NPU TensorPipeAgent,使用 tensorpipe_npu::Pipe / tensorpipe_npu::Error
- send() override:根据 messagesToFail_ 配置决定是否注入发送失败
- pipeWrite() override:根据 messagesToDelay_ 配置在发送前注入延迟
faulty_tensorpipe_agent.cpp — 实现
- 构造函数使用 static_cast<TensorPipeRpcBackendOptions>(opts) 将基类部分切片拷贝传递给父类,避免 std::move(opts) 后访问 moved-from 成员的 undefined behavior(原生 PyTorch 存在同样的问题,NPU 实现中一并修复)
### 修改(5 个)
init.cpp — Python 绑定
- 注册 FaultyTensorPipeRpcBackendOptions 和 FaultyTensorPipeAgent 到 torch_npu._C._distributed_rpc
backend_registry.py — 后端注册
- 新增 _faulty_tensorpipe_construct_rpc_backend_options_handler:创建 FaultyTensorPipeRpcBackendOptions
- 新增 _faulty_tensorpipe_init_backend_handler:创建 FaultyTensorPipeAgent 并调用 api._init_rpc_states()。在创建 agent 前调用 _init_device_state() 初始化 NPU 设备(NPU TensorPipeAgent 构造依赖已初始化的 NPU 设备,原生 CUDA 无此需求)
- _rpc_backend_registry():注册 FAULTY_TENSORPIPE 后端;import torch.distributed.rpc._testing 并设置 is_available = lambda: False 阻止原生 PyTorch 的 FAULTY_TE- NSORPIPE 重复注册;若原生已提前注册则重建 BackendType Enum 替换为 NPU handler
tensorpipe_agent.cpp — Bug 修复
- 添加 #include <fmt/format.h>
- 超时错误消息修正:errorMsg = "" → fmt::format(kRpcTimeoutErrorStr, timeoutMetadata.timeout.count())。原生 PyTorch 会格式化为 "RPCErr:1:RPC ran for more than set timeout (500 ms)...",NPU 版本此前传空字符串导致异常消息为 "RPCErr:1:",测试 regex "RPC ran for more than" 无法匹配
tensorpipe_agent.h — 依赖补充
- 添加 using torch::distributed::rpc::kRpcTimeoutErrorStr;,与同文件中 kSecToMsConversion、kUnsetRpcTimeout 的引入方式一致
CMakeLists.txt — 编译配置
- FILE(GLOB ...) 添加 rpc/testing/*.cpp
### 关于 PYTHON_CALL vs SCRIPT_CALL
torch_npu.contrib.transfer_to_npu 全局替换 torch.jit.script 为 pass-through 包装器。MultiProcessTestCase 以 multiprocessing.spawn 启动子进程运行测试,父进程中调用 enable_jit_script() 设置的状态不会传递给子进程。子进程中所有 @torch.jit.script 装饰的函数均不编译,RPC 框架因此使用 PYTHON_CALL 消息类型。测试用例需将 messages_to_delay / messages_to_fail 配置为 PYTHON_CALL 来匹配实际的消息类型。
# 【资料变更】
> 请确认是否涉及资料变更。如涉及,需要在PR中体现,并简要说明修改内容。如不涉及,需填写“不涉及”
# 【接口变更】
> 请确认是否涉及跨代码仓或者客户面可见的接口变更。如涉及,需要详细说明接口以及对应的变更内容,同时需要在资料中体现。如不涉及,需填写“不涉及”
# 【功能验证】
> 说明测试场景,测试方法。如果本次测试方式与常规单元测试不同,请详细说明您的测试步骤\
> 新增/变更内容是否已新增/适配UT测试用例看护,并补充测试自验证截图

# 【CheckList】
> PR提交人对以下CheckList自检项进行全量自检,自检通过或不涉及,均修改 [ ] 为 [x]
- [x] 代码注释完备,正确记录错误日志
- [x] 代码实现进行了返回值、空指针等校验
- [x] PR标题正确使用类型标签,如:feat、fix、refactor、docs、test等
- [x] PR持续集成流水线(CI)执行通过,代码检查无异常
See merge request: Ascend/pytorch!36104 | 8 天前 |