| feat(dts): add DFX capabilities for DTS runtime
Co-authored-by: rookie_hongchuan<hongchuan6@h-partners.com>
# message auto-generated for no-merge-commit merge:
!396 merge feature/dts-collective-detection into master
feat(dts): add DFX capabilities for DTS runtime
Created-by: rookie_hongchuan
Commit-by: rookie_hongchuan
Merged-by: ascend-robot
Description: ## Summary
本次 DTS DFX 能力拓展包含两项独立但互补的改进:
### 1. 集合通信检测
新增 _collective_op_guard() 上下文管理器,在 DTS 并行波次共享任务执行期间检测非法的跨 rank 集合通信操作。
该 guard **仅**作用于并行波次调度器,顺序波次不拦截——因为全体 rank 都参与时集合通信不会挂死。
检测到非法操作时抛出 SchemaValidateError,错误信息包含精确的调用位置(文件、行号、函数名)。
### 2. 日志可审计
将多卡 DTS 运行时日志从多条 INFO 合并为一条 summary,包含完整性能信息:
- tasks — 总任务数
- my_tasks — 本 rank 实际执行任务数
- exec_s / my_exec_s — 全部 / 本 rank 执行耗时
- sync_s / my_sync_s — 全部 / 本 rank 同步耗时
- t_run_wall_s — 墙钟耗时
- speedup — 加速比 = sum_exec / t_run_wall(修复了之前公式相反的错误)
- queue — 共享策略(shared / static_rr)
同步耗时非零时,DEBUG 级别保留 exec_over_sync 比值和 not suitable for parallel 提示。
## 典型场景
### 集合通信检测
处理器提交的共享任务函数中意外使用了 torch.distributed 集合通信:
```python
def my_calib_step(module):
gathered = [torch.zeros_like(act) for _ in range(world_size)]
dist.all_gather(gathered, act)
return torch.stack(gathered).mean()
```
报错信息:
```
Code: 203, Message: DTS shared task contains illegal cross-rank collective:
torch.distributed.all_gather().
TIP: DTS shared tasks must be independently executable by any single rank;
...
```
### DTS summary 日志
多卡并行(4 tasks, 2 rank, 共享队列):
```
[DTS] Summary: tasks=4 world_size=2 rank=0 my_tasks=2 exec_s=0.2403 ... speedup=1.86 queue=shared
```
## 测试覆盖
- [x] 非分布式下 guard 不做拦截(4 用例)
- [x] 分布式下 broadcast / all_reduce / barrier / all_gather 等 10 种集合通信各自抛出 SchemaValidateError(2 rank gloo,10 用例)
- [x] 集成测试:并行调度器合法任务正常执行(2 rank)
- [x] 结构化验证:并行调度器 _execute_local_task 包含 guard,顺序调度器不包含
- [x] DTS summary 日志格式契约测试(多卡并行 3 场景 + 单卡串行)
- [x] 加速比公式正确性验证(total_exec / t_run_wall)
- [x] 真实双进程 gloo 多 rank 日志输出验证
See merge request: Ascend/msmodelslim!396 | 8 天前 |