oh-my-pi 任务系统 s07 分析与迭代开发计划

本文档分析了 oh-my-pi 当前任务系统与 learn-claude-code s07 文档(learn-claude-code\docs\zh\s07-task-system.md)设计的差异,并提出迭代开发计划以支持持久化任务图功能。任务系统的实现可以参考“claude_code\src"下的代码实现。


一、oh-my-pi 任务系统现状分析

1.1 当前实现定位

根据对照分析,oh-my-pi 当前实现的是:

功能 learn-claude-code oh-my-pi 实现位置
s03: 会话级任务列表 s03_todo_write.py todo-write.ts packages/coding-agent/src/tools/todo-write.ts
s04: 子代理并行执行 s04_subagent.py task 工具 packages/coding-agent/src/task/index.ts
s07: 持久化任务图 s07_task_system.py 未实现 -

结论:oh-my-pi 缺乏跨会话的持久化任务图支持(s07)。


1.2 当前双系统架构

oh-my-pi 将任务管理拆分为两个独立工具:

1.2.1 todo-write 工具(会话级)

特性

  • 存储位置:Session 会话中(通过 USER_TODO_EDIT_CUSTOM_TYPE 自定义类型)
  • 数据结构:分阶段(Phase)的任务列表
  • 状态机pending → in_progress → completed → abandoned(四态)
  • 操作类型initstartdonermdropappendnote
  • 持久化:依赖会话持久化,非独立磁盘文件
  • Markdown 双向转换:支持将任务列表导出为 Markdown 格式,并从 Markdown 导入(代码第 320-438 行,实际行号需确认)
    • phasesToMarkdown():将任务列表渲染为 Markdown checklist
    • markdownToPhases():解析 Markdown checklist 回任务列表
    • 支持状态标记:[ ] (pending)、[x] (completed)、[/] (in_progress)、[-] (abandoned)
    • 支持任务备注的 blockquote 格式(> text
  • 核心用途:会话内的任务列表管理和进度追踪

关键代码

// packages/coding-agent/src/tools/todo-write.ts
export type TodoStatus = "pending" | "in_progress" | "completed" | "abandoned";

export interface TodoItem {
  content: string;
  status: TodoStatus;
  notes?: string[]; // 备注
}

export interface TodoPhase {
  name: string;
  tasks: TodoItem[];
}

// Markdown 双向转换
export function phasesToMarkdown(phases: TodoPhase[]): string;
export function markdownToPhases(md: string): { phases: TodoPhase[]; errors: string[] };

1.2.2 task 工具(子代理执行)

特性

  • 核心功能:向子代理分发并行任务
  • 代理发现:从 ~/.omp/agent/agents/*.md.omp/agents/*.md 加载
  • 执行模式:同步执行 + 异步后台执行(async.enabled
  • 隔离机制:worktree/fuse-overlay/fuse-projfs 隔离执行环境
    • worktree 模式:创建 Git worktree 进行隔离(worktree.ts
    • fuse-overlay 模式:使用 FUSE overlayfs 实现文件系统隔离(worktree.ts
    • fuse-projfs 模式:使用 FUSE projfs 实现文件系统隔离(worktree.ts
    • 支持自动清理隔离环境(task/index.ts 第 1028-1029 行)
    • 支持两种结果合并模式:patch(补丁合并,第 991-1000 行)和 branch(分支合并,第 958-989 行)
  • 并行控制:支持并发数限制(task.maxConcurrency
  • 进度追踪:通过 AgentProgress 实时追踪每个子任务状态
  • 计划模式集成:当父会话启用计划模式时,自动注入计划模式提示词并限制可用工具(代码第 614-623 行)
    • 注入 plan-mode-subagent.md 提示词
    • 限制子代理只能使用只读工具:readsearchfindlspweb_search
    • 禁用子代理的 spawns 能力
  • 模型覆盖机制:支持多层级模型覆盖(代码第 626-635 行)
    • 优先级:设置覆盖 > agent 定义 > 父会话模型
    • 支持通过 task.agentModelOverrides 配置 per-agent 模型
    • 支持环境变量 PI_BLOCKED_AGENT 防止递归调用(task/index.ts

关键类型

// packages/coding-agent/src/task/types.ts
export interface AgentProgress {
  index: number;
  id: string;
  agent: string;
  status: "pending" | "running" | "completed" | "failed" | "aborted";
  recentTools: Array<{ tool: string; args: string; endMs: number }>;
  tokens: number;
  durationMs: number;
}

二、s07 文档设计思路

2.1 核心设计目标

根据 learn-claude-code/docs/zh/s07-task-system.md,任务系统的核心设计:

设计要点 文档描述
持久化 任务存储为磁盘上的 JSON 文件(.tasks/task_{id}.json
依赖图 支持 blockedBy 字段定义任务依赖关系
状态机 pending → in_progress → completed 三态转换
自动解锁 完成任务时自动从依赖任务的 blockedBy 中移除
工具集 task_createtask_updatetask_listtask_get
核心价值 "状态存活于压缩之外"——任务图比任何对话都持久

2.2 任务图示例

.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成,才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

三、异同对比

3.1 相同之处

维度 文档 oh-my-pi
状态追踪 pending → in_progress → completed pending → running → completed(todo-write 还有 abandoned
工具化 提供 task_create/update/list/get 提供 todo_writetask 工具
并行支持 任务图支持并行任务 task 工具支持并行子代理执行
进度可视化 列表显示状态和依赖 TUI 渲染任务状态和进度

3.2 关键差异

维度 文档思路 oh-my-pi 实现
持久化方式 独立 JSON 文件(.tasks/task_{id}.json 会话内存储(todo-write)或临时 artifacts
依赖管理 显式 blockedBy 数组定义依赖边 未实现显式依赖关系
依赖解锁 完成任务时自动清除下游依赖 未实现自动解锁机制
核心用途 管理跨会话的长期目标 管理会话内的任务列表 + 子代理分发
任务粒度 任意任务(可跨越多个会话) 子代理调用(单次会话内完成)
隔离执行 未提及 支持 worktree/fuse 隔离环境
异步执行 未提及 支持后台异步任务(async.enabled

3.3 架构差异对比

文档设计的任务图架构:

┌─────────────────────────────────────────────────────────────┐
│                     .tasks/ 目录                           │
├─────────────┬─────────────┬─────────────┬─────────────────┤
│ task_1.json │ task_2.json │ task_3.json │ task_4.json ... │
│ {"id":1,    │ {"id":2,    │ {"id":3,    │ {"id":4,        │
│  "status":  │  "status":  │  "status":  │  "status":      │
│  "completed"│  "pending", │  "pending", │  "pending",     │
│  ...}       │  "blockedBy"│  "blockedBy"│  "blockedBy"    │
│             │  :[1]}      │  :[1]}      │  :[2,3]}        │
└─────────────┴─────────────┴─────────────┴─────────────────┘
         │               │               │
         └───────────────┼───────────────┘
                         ▼
                依赖图自动解锁
                (完成时触发)

oh-my-pi 的双系统架构:

┌─────────────────────────────────────────────────────────────────┐
│                    todo-write 工具(会话级任务列表)              │
├─────────────────────────────────────────────────────────────────┤
│ • Phase 分组管理                                                 │
│ • 四态状态机:pending → in_progress → completed → abandoned     │
│ • Markdown 双向转换                                              │
│   ├─ phasesToMarkdown(): 导出为 Markdown checklist               │
│   └─ markdownToPhases(): 从 Markdown 导入任务列表                │
│ • Session 存储(USER_TODO_EDIT_CUSTOM_TYPE)                     │
│ • 任务备注支持(notes 字段)                                     │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    会话生命周期内跟踪开发进度


┌─────────────────────────────────────────────────────────────────┐
│                    task 工具(子代理并行执行)                    │
├─────────────────────────────────────────────────────────────────┤
│ • Agent 发现机制                                                 │
│   ├─ Bundled agents(内置)                                      │
│   ├─ User agents(~/.omp/agent/agents/*.md)                     │
│   └─ Project agents(.omp/agents/*.md)                          │
│ • 执行模式                                                       │
│   ├─ 同步执行(blocking)                                        │
│   └─ 异步后台执行(async.enabled)                               │
│ • 隔离执行环境                                                   │
│   ├─ worktree 模式:Git worktree 隔离                           │
│   ├─ fuse-overlay 模式:FUSE overlayfs 隔离                     │
│   └─ fuse-projfs 模式:FUSE projfs 隔离                         │
│ • 结果合并模式                                                   │
│   ├─ patch 模式:生成并应用补丁                                 │
│   └─ branch 模式:创建并合并分支                                │
│ • 高级特性                                                       │
│   ├─ 计划模式集成(只读工具限制)                                │
│   ├─ 模型覆盖机制(多层级优先级)                                │
│   ├─ 并发控制(task.maxConcurrency)                            │
│   └─ 进度追踪(AgentProgress 实时更新)                          │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    单次会话内分发并行子任务
                    (支持隔离执行和结果合并)

四、设计决策分析

4.1 为什么 oh-my-pi 没有实现持久化任务图?

  1. 关注点分离:将任务管理拆分为两个独立功能

    • todo-write:专注于会话内的任务列表管理
    • task:专注于子代理的并行执行调度
  2. 架构取舍

    • 文档的设计是任务持久化优先,适合长期项目追踪
    • oh-my-pi 的设计是会话隔离优先,任务与会话生命周期绑定
  3. 实际使用场景

    • 文档的任务图适合"跨会话的大目标拆分"
    • oh-my-pi 的设计更适合"单会话内的任务分发"

4.2 oh-my-pi 的创新点

特性 说明
隔离执行 通过 worktree/fuse 实现任务间的文件系统隔离
异步后台 支持任务后台执行,不阻塞主会话
Agent 发现 支持用户/项目级别的自定义代理定义
进度追踪 实时追踪每个子任务的工具调用、token 消耗、耗时
结果合并 支持 patch/branch 两种模式合并隔离任务结果

五、总结

对比维度 文档(s07_task_system.py) oh-my-pi
持久化 磁盘 JSON 文件,跨会话存活 会话内存储,会话结束失效
依赖管理 显式 blockedBy + 自动解锁 未实现依赖关系
核心价值 长期任务追踪 子代理并行执行
状态数 3 态 todo-write: 4 态;task: 5 态
隔离能力 支持 worktree/fuse 隔离
异步支持 支持后台异步执行

核心结论:oh-my-pi 没有直接实现文档中的"持久化任务图"概念,而是将任务管理拆分为两个更专注的工具:

  1. todo-write:会话内的结构化任务列表(类似 s03 的 TodoManager 升级版)
  2. task:子代理并行执行框架(全新功能,文档未提及)

这种设计更符合实际使用场景——短期会话内的任务追踪和并行执行,而非跨会话的长期目标管理。


六、支持 s07 持久化任务图的迭代开发计划

迭代依赖关系图

迭代 1 (持久化层)
    │
    ▼
迭代 2 (依赖管理) ───┐
    │                │
    ▼                │
迭代 3 (TaskManager) ◄┘
    │
    ├──────────────┬──────────────┐
    ▼              ▼              ▼
迭代 4 (工具)   迭代 5 (TUI)   迭代 6 (执行集成)
    │              │              │
    └──────────────┴──────────────┘
                   │
                   ▼
           迭代 7 (会话集成)

推荐实施顺序

Phase 1 - 基础层(迭代 1-3)

  • 先实现核心数据结构和业务逻辑
  • 不急于暴露给用户,确保稳定性

Phase 2 - 工具层(迭代 4-5)

  • 提供用户可交互的工具命令
  • 可视化增强用户体验

Phase 3 - 集成层(迭代 6-7)

  • 与现有功能深度集成
  • 实现完整的跨会话工作流

迭代 1:持久化存储层(Task Persistence Layer)

目标:实现任务文件的 CRUD 操作,不改变现有工具行为

范围

  • 新增 packages/coding-agent/src/task/persistence.ts
  • 实现 TaskPersistence 类,负责读写 .tasks/task_{id}.json
  • 支持任务 ID 生成、加载、保存

核心接口

// 参考 packages/coding-agent/src/task/types.ts 的类型定义风格
import * as z from "zod/v4";

/** 任务状态枚举(参考 swarm-extension 扩展) */
export const TaskStatus = z.enum([
  "pending",      // 待执行
  "waiting",      // 等待依赖(新增,参考 swarm-extension/state.ts)
  "in_progress",  // 执行中
  "completed",    // 已完成
  "failed",       // 失败(新增,参考 swarm-extension/state.ts)
  "aborted",      // 已中止(新增,参考 swarm-extension/state.ts)
  "abandoned",    // 已放弃(从 todo-write 继承)
]).describe("Task status");
export type TaskStatus = z.infer<typeof TaskStatus>;

/** 任务数据结构(参考 swarm-extension 双向依赖设计) */
export const TaskSchema = z.object({
  id: z.number().describe("Unique task identifier"),
  subject: z.string().describe("Task title/summary"),
  description: z.optional(z.string().describe("Detailed task description")),
  status: TaskStatus,
  blockedBy: z.array(z.number()).describe("List of task IDs this task depends on"),
  blocks: z.array(z.number()).describe("List of task IDs blocked by this task (双向依赖,参考 swarm-extension/dag.ts)"),
  reportsTo: z.array(z.number()).describe("List of task IDs this task reports to (隐式依赖,参考 swarm-extension/dag.ts)"),
  createdAt: z.number().describe("Unix timestamp in milliseconds"),
  updatedAt: z.number().describe("Unix timestamp in milliseconds"),
});
export type Task = z.infer<typeof TaskSchema>;

/** 任务过滤器 */
export const TaskFilterSchema = z.object({
  status: z.optional(TaskStatus),
  blocked: z.optional(z.boolean().describe("Filter by blocked status")),
});
export type TaskFilter = z.infer<typeof TaskFilterSchema>;

/** 持久化错误类型 */
export const TaskPersistenceErrorSchema = z.object({
  code: z.enum([
    "ENOENT",           // 任务不存在
    "EEXIST",           // 任务已存在
    "ELOCKED",          // 文件被锁定
    "EIO",              // I/O 错误
    "ENOSPC",           // 磁盘空间不足
    "EINVALID",         // 无效数据格式
    "ECONCURRENT",      // 并发写入冲突
  ]).describe("错误码"),
  message: z.string().describe("错误描述"),
  taskId: z.optional(z.number()).describe("相关任务ID"),
  cause: z.optional(z.string()).describe("根本原因"),
});
export type TaskPersistenceError = z.infer<typeof TaskPersistenceErrorSchema>;

/** 持久化操作结果 */
export type PersistenceResult<T> = 
  | { success: true; data: T }
  | { success: false; error: TaskPersistenceError };

/** 持久化接口 */
export interface TaskPersistence {
  /**
   * 创建新任务
   * @param subject 任务标题
   * @param description 任务描述(可选)
   * @param blockedBy 依赖的任务ID列表(可选)
   * @returns 创建的任务或错误信息
   */
  create(subject: string, description?: string, blockedBy?: number[]): PersistenceResult<Task>;
  
  /**
   * 加载任务
   * @param taskId 任务ID
   * @returns 任务对象或错误信息,任务不存在时返回 null
   */
  load(taskId: number): PersistenceResult<Task | null>;
  
  /**
   * 保存任务(带文件锁保护)
   * @param task 任务对象
   * @param timeoutMs 锁等待超时时间(默认 5000ms)
   * @returns 保存结果或错误信息
   */
  save(task: Task, timeoutMs?: number): PersistenceResult<void>;
  
  /**
   * 列出任务
   * @param filter 过滤条件(可选)
   * @returns 任务列表或错误信息
   */
  list(filter?: TaskFilter): PersistenceResult<Task[]>;
  
  /**
   * 删除任务
   * @param taskId 任务ID
   * @returns 删除结果或错误信息
   */
  delete(taskId: number): PersistenceResult<void>;
  
  /**
   * 获取下一个可用的任务ID
   * @returns 下一个任务ID或错误信息
   */
  getNextId(): PersistenceResult<number>;
  
  /**
   * 尝试获取文件锁
   * @param taskId 任务ID
   * @param timeoutMs 超时时间
   * @returns 锁句柄或错误信息
   */
  tryLock(taskId: number, timeoutMs?: number): PersistenceResult<number>;
  
  /**
   * 释放文件锁
   * @param lockHandle 锁句柄
   */
  unlock(lockHandle: number): void;
}

设计说明

  • 使用 Zod v4 进行类型定义,与现有代码风格保持一致(参考 task/types.tstools/todo-write.ts
  • 使用 z.enum() 定义枚举类型
  • 使用 z.object() 定义复杂对象
  • 使用 .describe() 添加字段描述,支持 OpenAPI 文档生成
  • 错误处理:定义统一的 TaskPersistenceError 错误类型,包含错误码和详细描述
  • 并发控制:使用 fs.promises.lock() 实现文件级锁,支持超时机制
  • 原子写入:使用 writeFileflag: 'wx' 确保原子性,配合文件锁防止并发写入
  • 接口契约:每个方法都有明确的 JSDoc 注释,定义输入参数和返回值语义

文件锁实现细节

// packages/coding-agent/src/task/persistence.ts - 文件锁实现
async function acquireLock(taskId: number, timeoutMs: number): Promise<number> {
  const lockPath = `${TASKS_DIR}/task_${taskId}.lock`;
  const startTime = Date.now();
  
  while (Date.now() - startTime < timeoutMs) {
    try {
      // 使用 wx 标志确保只创建新文件(原子操作)
      const fd = await fs.promises.open(lockPath, 'wx');
      await fd.close();
      return taskId; // 返回锁句柄
    } catch (err) {
      if (err.code === 'EEXIST') {
        // 锁已存在,等待后重试
        await new Promise(resolve => setTimeout(resolve, 50));
        continue;
      }
      throw err;
    }
  }
  
  throw new Error(`Timeout waiting for lock on task ${taskId}`);
}

async function releaseLock(taskId: number): Promise<void> {
  const lockPath = `${TASKS_DIR}/task_${taskId}.lock`;
  await fs.promises.unlink(lockPath).catch(() => {});
}

异常场景处理

异常场景 检测方式 处理策略
磁盘空间不足 ENOSPC 错误码 返回 TaskPersistenceError,提示用户清理空间
文件损坏 JSON 解析失败 返回 EINVALID 错误,保留损坏文件供后续分析
网络分区 EIO 错误码 重试 3 次后返回错误,记录日志
并发写入冲突 锁获取失败 返回 ECONCURRENT 错误,建议重试

验收标准

  • 单元测试覆盖 CRUD 操作
  • 任务文件正确序列化到 .tasks/ 目录
  • 重启后能正确加载已有任务
  • 任务 ID 自增逻辑正确
  • 类型定义通过 TypeScript 编译检查
  • 并发写入冲突测试(多进程同时写入同一任务)
  • 文件锁超时测试
  • 磁盘空间不足场景测试
  • 网络分区场景测试(模拟 I/O 错误)

内聚性:仅负责文件 I/O,不涉及业务逻辑


迭代 2:依赖图管理(Dependency Graph Manager)

目标:实现任务依赖关系的管理和自动解锁

范围

  • 新增 packages/coding-agent/src/task/dependency-manager.ts
  • 实现依赖添加、移除、验证
  • 实现完成任务时的自动解锁逻辑

核心接口

// 参考 packages/coding-agent/src/task/types.ts 的类型定义风格
import * as z from "zod/v4";

/** 依赖图节点 */
export const DependencyNodeSchema = z.object({
  taskId: z.number().describe("Task ID"),
  dependsOn: z.array(z.number()).describe("Task IDs this task depends on"),
  blocks: z.array(z.number()).describe("Task IDs blocked by this task"),
});
export type DependencyNode = z.infer<typeof DependencyNodeSchema>;

/** 执行模式(参考 swarm-extension/schema.ts) */
export const ExecutionModeSchema = z.enum([
  "default",     // 默认模式,按依赖顺序执行
  "pipeline",    // 管道模式,支持多次迭代
  "parallel",    // 并行模式,无依赖任务同时执行
  "sequential",  // 顺序模式,按声明顺序执行
]).describe("Task execution mode");
export type ExecutionMode = z.infer<typeof ExecutionModeSchema>;

/** 执行波次(参考 swarm-extension/dag.ts buildExecutionWaves) */
export const ExecutionWaveSchema = z.object({
  waveIndex: z.number().describe("波次序号"),
  taskIds: z.array(z.number()).describe("本波次可并行执行的任务ID列表"),
});
export type ExecutionWave = z.infer<typeof ExecutionWaveSchema>;

/** 依赖管理错误类型 */
export const DependencyErrorSchema = z.object({
  code: z.enum([
    "E_CYCLE",          // 循环依赖
    "E_NOT_FOUND",      // 任务不存在
    "E_INVALID_DEP",    // 无效的依赖关系(自依赖等)
    "E_LOCKED",         // 任务被锁定
  ]).describe("错误码"),
  message: z.string().describe("错误描述"),
  taskId: z.optional(z.number()).describe("相关任务ID"),
  relatedTaskId: z.optional(z.number()).describe("相关依赖任务ID"),
  cycle: z.optional(z.array(z.number())).describe("循环路径(仅循环依赖错误)"),
});
export type DependencyError = z.infer<typeof DependencyErrorSchema>;

/** 依赖管理操作结果 */
export type DependencyResult<T> =
  | { success: true; data: T }
  | { success: false; error: DependencyError };

/** 循环依赖错误 */
export const CycleDetectedErrorSchema = z.object({
  cycle: z.array(z.number()).describe("Task IDs forming a cycle"),
  message: z.string().describe("错误描述"),
});
export type CycleDetectedError = z.infer<typeof CycleDetectedErrorSchema>;

/** 依赖管理器接口 */
export interface DependencyManager {
  /**
   * 添加依赖关系
   * @param taskId 依赖方任务ID
   * @param dependsOnTaskId 被依赖方任务ID
   * @returns 添加结果或错误信息
   * @throws E_CYCLE 如果添加后形成循环依赖
   * @throws E_NOT_FOUND 如果任一任务不存在
   * @throws E_INVALID_DEP 如果尝试添加自依赖
   */
  addDependency(taskId: number, dependsOnTaskId: number): DependencyResult<void>;
  
  /**
   * 移除依赖关系
   * @param taskId 依赖方任务ID
   * @param dependsOnTaskId 被依赖方任务ID
   * @returns 移除结果或错误信息
   * @throws E_NOT_FOUND 如果任务不存在或依赖关系不存在
   */
  removeDependency(taskId: number, dependsOnTaskId: number): DependencyResult<void>;
  
  /**
   * 完成任务并自动解锁下游任务
   * @param taskId 任务ID
   * @returns 解锁的下游任务列表或错误信息
   * @throws E_NOT_FOUND 如果任务不存在
   */
  completeTask(taskId: number): DependencyResult<Task[]>;
  
  /**
   * 获取被阻塞的任务(blockedBy 非空)
   * @returns 被阻塞的任务列表
   */
  getBlockedTasks(): Task[];
  
  /**
   * 获取可执行的任务(pending 且无依赖)
   * @returns 可执行的任务列表
   */
  getAvailableTasks(): Task[];
  
  /**
   * 验证依赖图是否为 DAG(无环)
   * @returns 验证结果,包含是否有效和循环路径(如果存在)
   */
  validateDAG(): { valid: boolean; cycle?: number[] };
  
  /**
   * 获取依赖图拓扑排序
   * @returns 拓扑排序后的任务ID数组,如果存在循环则返回 null
   */
  getTopologicalOrder(): number[] | null;
  
  /**
   * 生成执行波次(参考 swarm-extension/dag.ts buildExecutionWaves)
   * @returns 执行波次数组,同一波次内的任务可并行执行
   */
  getExecutionWaves(): ExecutionWave[];
  
  /**
   * 获取当前可执行的任务(pending/waiting 且所有依赖已完成)
   * @param completedTaskIds 已完成的任务ID集合
   * @returns 当前可执行的任务列表
   */
  getReadyTasks(completedTaskIds: Set<number>): Task[];
  
  /**
   * 设置执行模式
   * @param mode 执行模式
   */
  setExecutionMode(mode: ExecutionMode): void;
  
  /**
   * 获取任务的所有依赖(包括间接依赖)
   * @param taskId 任务ID
   * @returns 依赖的任务ID列表(去重)
   * @throws E_NOT_FOUND 如果任务不存在
   */
  getAllDependencies(taskId: number): DependencyResult<number[]>;
  
  /**
   * 获取任务的所有下游任务(包括间接下游)
   * @param taskId 任务ID
   * @returns 下游任务ID列表(去重)
   * @throws E_NOT_FOUND 如果任务不存在
   */
  getAllDownstream(taskId: number): DependencyResult<number[]>;
  
  /**
   * 获取任务的直接依赖数
   * @param taskId 任务ID
   * @returns 直接依赖数量
   */
  getDirectDependencyCount(taskId: number): number;
  
  /**
   * 检查任务是否被指定任务阻塞
   * @param taskId 任务ID
   * @param blockerId 阻塞者任务ID
   * @returns 是否被阻塞
   */
  isBlockedBy(taskId: number, blockerId: number): boolean;
  
  /**
   * 批量完成多个任务
   * @param taskIds 任务ID列表
   * @param cascade 是否级联解锁下游任务(默认 true)
   * @returns 完成的任务列表和解锁的任务列表
   */
  completeTasks(taskIds: number[], cascade?: boolean): DependencyResult<{
    completed: Task[];
    unlocked: Task[];
  }>;
}

设计说明

  • 使用 Zod v4 定义数据结构,与现有代码风格一致
  • 错误处理:定义统一的 DependencyError 错误类型,包含错误码和详细描述
  • validateDAG() 返回结构化错误信息而非抛出异常
  • 添加 getTopologicalOrder() 支持拓扑排序(参考现有并行执行逻辑)
  • 添加 getExecutionWaves() 支持执行波次生成(参考 swarm-extension/dag.ts)
  • 添加 getAllDependencies()getAllDownstream() 支持深度依赖查询
  • 添加 getDirectDependencyCount()isBlockedBy()completeTasks() 等辅助方法
  • 接口方法设计参考 task/index.ts 的错误处理模式
  • 每个方法都有明确的 JSDoc 注释,定义输入参数、返回值和可能的错误情况

执行波次实现(参考 swarm-extension/dag.ts)

// packages/coding-agent/src/task/dependency-manager.ts - 执行波次生成
function buildExecutionWaves(deps: Map<number, Set<number>>): ExecutionWave[] {
  const waves: ExecutionWave[] = [];
  const completed = new Set<number>();
  const remaining = new Set(deps.keys());
  
  while (remaining.size > 0) {
    const wave: number[] = [];
    // 找出所有依赖已满足的节点
    for (const node of remaining) {
      const nodeDeps = deps.get(node)!;
      let ready = true;
      for (const dep of nodeDeps) {
        if (!completed.has(dep)) {
          ready = false;
          break;
        }
      }
      if (ready) {
        wave.push(node);
      }
    }
    
    if (wave.length === 0) {
      // 无法继续,存在循环依赖或孤立节点
      break;
    }
    
    waves.push({ waveIndex: waves.length, taskIds: wave });
    for (const node of wave) {
      remaining.delete(node);
      completed.add(node);
    }
  }
  
  return waves;
}

任务恢复机制

// packages/coding-agent/src/task/dependency-manager.ts - 任务恢复
interface RecoveryState {
  taskId: number;
  originalStatus: TaskStatus;
  blockedByBefore: number[];
  timestamp: number;
}

class DependencyManagerImpl implements DependencyManager {
  private recoveryLog: RecoveryState[] = [];
  
  /**
   * 记录任务状态变更前的状态(用于恢复)
   */
  private logRecoveryState(task: Task, blockedByBefore: number[]): void {
    this.recoveryLog.push({
      taskId: task.id,
      originalStatus: task.status,
      blockedByBefore,
      timestamp: Date.now(),
    });
    
    // 保留最近 100 条恢复记录
    if (this.recoveryLog.length > 100) {
      this.recoveryLog.shift();
    }
  }
  
  /**
   * 撤销最近的操作
   * @param steps 撤销步数(默认 1)
   * @returns 撤销的任务数量
   */
  undo(steps: number = 1): number {
    let undoneCount = 0;
    for (let i = 0; i < steps && this.recoveryLog.length > 0; i++) {
      const state = this.recoveryLog.pop()!;
      const task = this.taskPersistence.load(state.taskId);
      if (task.success && task.data) {
        task.data.status = state.originalStatus;
        task.data.blockedBy = [...state.blockedByBefore];
        this.taskPersistence.save(task.data);
        undoneCount++;
      }
    }
    return undoneCount;
  }
  
  /**
   * 清理过期的恢复日志
   * @param maxAgeMs 最大保留时间(默认 1 小时)
   */
  cleanupRecoveryLog(maxAgeMs: number = 3600000): void {
    const cutoff = Date.now() - maxAgeMs;
    this.recoveryLog = this.recoveryLog.filter(r => r.timestamp > cutoff);
  }
}

验收标准

  • 单元测试验证依赖添加/移除
  • 完成任务后,下游任务的 blockedBy 自动移除该任务
  • 检测循环依赖并返回循环路径
  • 正确识别可执行任务
  • 拓扑排序结果正确
  • 循环依赖检测测试(多种循环模式)
  • 任务恢复机制测试(undo 操作)
  • 批量完成任务测试
  • 自依赖检测测试

内聚性:仅处理依赖逻辑,不涉及文件 I/O


迭代 3:TaskManager 整合层(低侵入)

目标:整合持久化和依赖管理,提供统一的任务管理接口

范围

  • 新增 packages/coding-agent/src/task/task-manager.ts(完全独立模块)
  • 组合 TaskPersistenceDependencyManager
  • 不修改现有 task/index.ts

核心接口

export class TaskManager {
  private persistence: TaskPersistence;
  private dependencyManager: DependencyManager;
  
  // CRUD(委托模式,不直接实现)
  create(subject: string, description?: string, blockedBy?: number[]): Task;
  update(taskId: number, updates: Partial<Task>): Task;
  complete(taskId: number): void;
  get(taskId: number): Task;
  list(filter?: TaskFilter): Task[];
  
  // 依赖查询
  getAvailableTasks(): Task[];
  getBlockedTasks(): Task[];
  getDependencyGraph(): DependencyGraph;
  
  // 单例访问
  static getInstance(): TaskManager;
}

设计原则

  • 零侵入:不修改现有 task/index.ts
  • 单例模式:全局唯一实例,便于集成
  • 委托模式:业务逻辑委托给底层模块
  • 依赖注入:支持测试时注入 mock 实现

验收标准

  • 单元测试覆盖所有接口
  • 与现有 todo-write 工具共存,不冲突
  • 支持任务状态转换验证(不能从 completed 回到 pending)
  • 集成测试验证完整工作流

内聚性:业务逻辑层,协调持久化和依赖管理


迭代 4:新增 task 工具命令(低侵入)

目标:扩展 task 工具,支持 s07 的任务管理命令

范围

  • 新增 packages/coding-agent/src/tools/task-graph.ts(完全独立工具)
  • 不修改现有 task/types.tstask/index.ts
  • 通过适配器模式桥接新旧工具

新增工具命令

// task_graph_create
{ name: "task_graph_create", params: { subject: string, description?: string, blockedBy?: number[] } }

// task_graph_update
{ name: "task_graph_update", params: { taskId: number, status?: TaskStatus, addBlockedBy?: number[], removeBlockedBy?: number[] } }

// task_graph_list
{ name: "task_graph_list", params: { filter?: "all" | "pending" | "blocked" | "completed" } }

// task_graph_get
{ name: "task_graph_get", params: { taskId: number } }

// task_graph_complete
{ name: "task_graph_complete", params: { taskId: number } }

设计原则

  • 零侵入:不修改现有 task/types.tstask/index.ts
  • 独立工具:新功能作为独立工具实现
  • 命令隔离:使用 task_graph_ 前缀区分新旧命令
  • 适配器桥接:通过 TaskGraphAdapter 连接任务图和会话

验收标准

  • 每个工具命令有独立的单元测试
  • 工具描述文档更新(prompts/tools/task-graph.md
  • TUI 渲染器支持新工具的展示
  • 与现有 task 工具共存,命令不冲突

内聚性:工具层,仅负责参数验证和调用 TaskManager


迭代 5:TUI 渲染器增强

目标:在终端界面中可视化任务图和依赖关系

范围

  • 修改 packages/coding-agent/src/task/render.ts
  • 新增任务板(Task Board)视图
  • 支持依赖关系可视化

渲染模式

任务板视图:
┌─────────────────────────────────────────┐
│ 待办 (Pending)                          │
├─────────────────────────────────────────┤
│ [ ] #1: Setup project                   │
│ [ ] #2: Write code (blocked by: 1)      │
│ [ ] #3: Write tests (blocked by: 1)     │
│ [ ] #4: Deploy (blocked by: 2, 3)       │
└─────────────────────────────────────────┘

依赖图视图:
    ┌──────────┐
    │ Task 1   │
    │ [✓]      │
    └────┬─────┘
         │
    ┌────┴─────┐
    ▼          ▼
┌──────────┐ ┌──────────┐
│ Task 2   │ │ Task 3   │
│ [>]      │ │ [ ]      │
└────┬─────┘ └────┬─────┘
         │        │
         └────────┘
              │
              ▼
         ┌──────────┐
         │ Task 4   │
         │ [ ]      │
         └──────────┘

验收标准

  • 支持折叠/展开依赖关系
  • 不同状态用不同颜色标识
  • 支持显示任务阻塞原因

内聚性:仅负责 UI 渲染,不涉及业务逻辑


迭代 6:与现有 task 工具集成(低侵入)

目标:将持久化任务图与现有子代理执行功能结合

范围

  • 新增 packages/coding-agent/src/task/task-graph-executor.ts(独立执行器)
  • 不修改现有 task/executor.ts
  • 通过可选参数和回调实现集成

集成点

// 新增独立执行器(不修改现有 executor.ts)
export class TaskGraphExecutor {
  static async executeAvailableTasks(agent: string): Promise<void> {
    const taskManager = TaskManager.getInstance();
    const availableTasks = taskManager.getAvailableTasks();
    
    for (const task of availableTasks) {
      taskManager.update(task.id, { status: "in_progress" });
      
      // 使用现有 runSubprocess,通过回调追踪状态
      const result = await runSubprocess({ 
        agent, 
        task: task.subject,
        onStatusChange: (status) => {
          if (status === "completed") {
            taskManager.complete(task.id); // 自动解锁下游
          }
        }
      });
      
      if (result.exitCode !== 0) {
        taskManager.update(task.id, { status: "failed" });
      }
    }
  }
}

设计原则

  • 零侵入:不修改现有 executor.ts
  • 独立模块:新执行逻辑封装在独立类中
  • 回调机制:通过 onStatusChange 回调追踪任务状态
  • 组合模式:复用现有 runSubprocess 而不修改它

验收标准

  • 集成测试验证完整执行流程
  • 任务执行失败时不标记为完成
  • 支持批量执行多个可用任务
  • 不影响现有 runSubprocess 功能

内聚性:执行协调层,连接任务图和子代理执行


迭代 7:会话持久化集成(低侵入)

目标:任务图在会话重启后仍然可用

范围

  • 复用现有 session-manager.tsCustomEntry 机制
  • 不修改 session-manager.ts 核心逻辑
  • 通过适配器模式实现任务图持久化

集成点

// 新增独立持久化适配器(不修改 session-manager.ts)
export class TaskGraphSessionAdapter {
  private static CUSTOM_TYPE = "task-graph-v1";
  
  static async saveToSession(sessionManager: SessionManager, graph: TaskGraph): Promise<void> {
    // 使用现有 CustomEntry 机制存储任务图
    await sessionManager.appendCustomEntry(TaskGraphSessionAdapter.CUSTOM_TYPE, {
      graph,
      timestamp: Date.now(),
    });
  }
  
  static loadFromSession(sessionManager: SessionManager): TaskGraph | null {
    // 从 CustomEntry 加载任务图
    const entries = sessionManager.getEntries();
    const graphEntry = entries
      .filter(e => e.type === "custom" && e.customType === TaskGraphSessionAdapter.CUSTOM_TYPE)
      .pop();
    
    return graphEntry?.data?.graph ?? null;
  }
  
  // 支持 task:// URI 读取任务(通过 URL 处理器扩展)
  static handleTaskUri(url: URL): Task | null {
    const taskId = parseInt(url.pathname.slice(1));
    const taskManager = TaskManager.getInstance();
    return taskManager.get(taskId);
  }
}

设计原则

  • 零侵入:不修改现有 session-manager.ts
  • 复用机制:利用 CustomEntry 存储任务图数据
  • 适配器模式:通过独立适配器连接会话和任务图
  • 解耦设计:任务图生命周期与会话解耦

验收标准

  • 会话重启后任务图完整恢复
  • 支持 read task://1 读取任务详情
  • 任务状态与会话状态解耦
  • 不影响现有会话持久化机制

内聚性:会话适配层,负责任务图与会话的桥接


六、低侵入性重构方案总结

6.1 总体架构设计

采用适配器模式 + 独立模块的策略,将任务图功能隔离在独立模块中:

┌─────────────────────────────────────────────────────────────┐
│                     新增独立模块                              │
│  task/task-graph/                                          │
│    ├── types.ts           # 任务图类型定义(独立于现有types)   │
│    ├── persistence.ts     # 任务图持久化逻辑                   │
│    ├── dependency.ts      # 依赖管理与DAG处理                  │
│    ├── manager.ts         # 任务图管理器(核心)                │
│    ├── executor.ts        # 任务图执行器                       │
│    └── session-adapter.ts # 会话适配器                        │
└─────────────────────────────────────────────────────────────┘
                           ↓ 适配器层
┌─────────────────────────────────────────────────────────────┐
│                     现有代码(最小修改)                       │
│  task/types.ts         ← 扩展接口(不修改原有定义)             │
│  task/index.ts         ← 注入适配器(单点接入)                 │
│  task/executor.ts      ← 新增可选参数(不修改核心逻辑)          │
│  session-manager.ts    ← 复用 CustomEntry(零侵入)             │
└─────────────────────────────────────────────────────────────┘

6.2 侵入性对比表

文件 原有修改方式 低侵入方案 侵入程度
task/types.ts 修改/扩展现有接口 新增独立类型文件 零侵入
task/index.ts 修改核心函数逻辑 包装模式 + 条件注入 极低
task/executor.ts 修改执行流程 可选参数 + 回调
session-manager.ts 新增 Entry 类型 复用 CustomEntry 零侵入

6.3 关键设计原则

原则 说明
开闭原则 对扩展开放(新增模块),对修改关闭(不改动现有代码)
单一职责 任务图逻辑集中在独立模块,现有模块保持原有职责
依赖倒置 任务图模块依赖抽象接口,不依赖具体实现
渐进式引入 通过配置开关控制启用,支持灰度发布

6.4 新增文件清单

文件路径 功能描述 状态
task/task-graph/types.ts 任务图类型定义 新增
task/task-graph/persistence.ts 持久化存储层 新增
task/task-graph/dependency.ts 依赖图管理 新增
task/task-graph/manager.ts 任务图管理器 新增
task/task-graph/executor.ts 任务图执行器 新增
task/task-graph/session-adapter.ts 会话适配器 新增
tools/task-graph.ts 任务图工具命令 新增

6.5 优势总结

优势 说明
风险隔离 新功能失败不影响现有任务执行
易于回滚 只需移除新模块,无需恢复修改
代码清晰 职责边界明确,易于维护
测试友好 新模块可独立测试,不影响原有测试套件
渐进交付 支持灰度发布和 A/B 测试

七、风险与缓解

7.1 风险矩阵

风险类型 风险描述 严重程度 发生概率 缓解措施
并发冲突 多个会话同时写入同一任务文件 文件锁 + 原子写入 + 乐观锁
数据损坏 磁盘空间不足、网络分区导致写入中断 原子写入 + 备份机制
性能瓶颈 任务图过大影响加载和查询性能 增量加载 + 任务归档
循环依赖 依赖图中存在循环导致死锁 DAG 验证 + 早期检测
数据丢失 异常退出导致未保存的修改丢失 自动保存 + 定期备份

7.2 并发控制方案

文件锁实现

  • 使用 fs.promises.open()wx 标志创建锁文件(原子操作)
  • 锁文件路径:.tasks/task_{id}.lock
  • 支持可配置的超时机制(默认 5000ms)
  • 自动清理过期锁(超过 30 秒未释放视为失效)

乐观锁机制

// 乐观锁版本控制
export const TaskWithVersionSchema = TaskSchema.extend({
  version: z.number().default(1).describe("版本号,每次更新递增"),
});

// 更新时验证版本
async function updateTask(task: Task): PersistenceResult<void> {
  const existing = await this.load(task.id);
  if (!existing.success || !existing.data) {
    return { success: false, error: { code: "ENOENT", message: "任务不存在" } };
  }
  
  if (existing.data.version !== task.version) {
    return { 
      success: false, 
      error: { 
        code: "ECONCURRENT", 
        message: "任务已被其他会话修改",
        taskId: task.id 
      } 
    };
  }
  
  task.version++;
  return this.save(task);
}

并发写入测试用例

// packages/coding-agent/src/task/__tests__/persistence-concurrent.test.ts
describe("并发写入测试", () => {
  it("多个进程同时写入同一任务应检测冲突", async () => {
    const task = await taskPersistence.create("Test Task");
    expect(task.success).toBe(true);
    
    // 模拟两个会话同时修改
    const session1Task = { ...task.data!, version: 1, subject: "Session 1" };
    const session2Task = { ...task.data!, version: 1, subject: "Session 2" };
    
    const result1 = await taskPersistence.save(session1Task);
    expect(result1.success).toBe(true);
    
    const result2 = await taskPersistence.save(session2Task);
    expect(result2.success).toBe(false);
    expect(result2.error?.code).toBe("ECONCURRENT");
  });
  
  it("文件锁应防止同时写入", async () => {
    const task = await taskPersistence.create("Locked Task");
    const lockHandle = await taskPersistence.tryLock(task.data!.id, 100);
    
    // 另一个尝试获取锁应失败
    const anotherLock = await taskPersistence.tryLock(task.data!.id, 100);
    expect(anotherLock.success).toBe(false);
    expect(anotherLock.error?.code).toBe("ELOCKED");
    
    await taskPersistence.unlock(lockHandle.data!);
  });
});

7.3 错误处理策略

异常场景处理矩阵

异常场景 检测方式 处理策略 用户通知 恢复机制
磁盘空间不足 ENOSPC 错误码 立即停止写入,返回错误 "磁盘空间不足,请清理后重试" 释放空间后手动重试
文件损坏 JSON 解析失败 保留损坏文件,返回错误 "任务文件损坏,正在尝试恢复" 从备份恢复或手动修复
网络分区 EIO 错误码 重试 3 次(间隔 1s),失败则返回错误 "网络异常,请检查连接" 网络恢复后自动重试
并发写入冲突 版本不匹配 返回错误,提示重试 "任务已被其他会话修改,请刷新后重试" 用户刷新后重新操作
锁超时 获取锁超时 返回错误,提示稍后重试 "系统繁忙,请稍后重试" 自动重试或用户手动重试

任务恢复机制设计

// packages/coding-agent/src/task/recovery.ts
export interface RecoveryPoint {
  id: string;
  timestamp: number;
  taskIds: number[];
  snapshot: Record<number, Task>;
}

export class TaskRecoveryManager {
  private backupDir = `${TASKS_DIR}/backups`;
  
  /**
   * 创建恢复点
   * @param taskIds 要备份的任务ID列表
   * @returns 恢复点ID
   */
  async createRecoveryPoint(taskIds: number[]): Promise<string> {
    const snapshot: Record<number, Task> = {};
    for (const id of taskIds) {
      const result = await this.persistence.load(id);
      if (result.success && result.data) {
        snapshot[id] = { ...result.data };
      }
    }
    
    const recoveryPoint: RecoveryPoint = {
      id: `rp-${Date.now()}`,
      timestamp: Date.now(),
      taskIds,
      snapshot,
    };
    
    await fs.promises.writeFile(
      `${this.backupDir}/${recoveryPoint.id}.json`,
      JSON.stringify(recoveryPoint, null, 2)
    );
    
    // 保留最近 10 个恢复点
    await this.cleanupOldBackups(10);
    
    return recoveryPoint.id;
  }
  
  /**
   * 从恢复点恢复
   * @param recoveryPointId 恢复点ID
   * @returns 恢复的任务数量
   */
  async restoreFromRecoveryPoint(recoveryPointId: string): Promise<number> {
    const backupPath = `${this.backupDir}/${recoveryPointId}.json`;
    const data = await fs.promises.readFile(backupPath, 'utf8');
    const recoveryPoint = JSON.parse(data) as RecoveryPoint;
    
    let restoredCount = 0;
    for (const task of Object.values(recoveryPoint.snapshot)) {
      const result = await this.persistence.save(task);
      if (result.success) restoredCount++;
    }
    
    return restoredCount;
  }
  
  private async cleanupOldBackups(keepCount: number): Promise<void> {
    const backups = await fs.promises.readdir(this.backupDir);
    const sorted = backups.sort().reverse();
    for (const backup of sorted.slice(keepCount)) {
      await fs.promises.unlink(`${this.backupDir}/${backup}`);
    }
  }
}

7.4 错误返回码定义

错误码 含义 所属模块 处理建议
ENOENT 任务不存在 TaskPersistence 检查任务ID是否正确
EEXIST 任务已存在 TaskPersistence 使用其他任务ID
ELOCKED 文件被锁定 TaskPersistence 稍后重试
EIO I/O 错误 TaskPersistence 检查存储设备和网络
ENOSPC 磁盘空间不足 TaskPersistence 清理磁盘空间
EINVALID 无效数据格式 TaskPersistence 检查数据格式
ECONCURRENT 并发写入冲突 TaskPersistence 刷新后重试
E_CYCLE 循环依赖 DependencyManager 调整依赖关系
E_NOT_FOUND 任务不存在 DependencyManager 检查任务ID
E_INVALID_DEP 无效依赖关系 DependencyManager 检查依赖配置

八、测试覆盖计划

8.1 测试分层策略

测试层级 覆盖范围 工具
单元测试 单个模块/函数 Vitest
集成测试 模块间协作 Vitest
端到端测试 完整工作流 Playwright
性能测试 大规模任务图 k6 / 自定义
并发测试 多会话并发访问 Vitest + 自定义
容错测试 异常场景和恢复 Vitest + 模拟

8.2 测试覆盖矩阵

迭代 1:持久化存储层

测试场景 描述 优先级 测试文件
CRUD 操作 任务创建、读取、更新、删除 persistence.test.ts
ID 自增 验证任务 ID 连续递增 persistence.test.ts
文件格式 JSON 序列化/反序列化正确性 persistence.test.ts
边界条件 空任务、超大任务描述 persistence.test.ts
并发写入冲突 多进程同时写入同一任务 persistence-concurrent.test.ts
文件锁超时 锁等待超时处理 persistence-concurrent.test.ts
磁盘空间不足 ENOSPC 错误处理 persistence-error.test.ts
网络分区 EIO 错误重试机制 persistence-error.test.ts
文件损坏恢复 JSON 解析失败处理 persistence-error.test.ts
乐观锁版本控制 版本不匹配检测 persistence-concurrent.test.ts

迭代 2:依赖图管理

测试场景 描述 优先级 测试文件
依赖添加/移除 正确维护 blockedBy 关系 dependency-manager.test.ts
自动解锁 完成任务后下游任务自动解锁 dependency-manager.test.ts
循环依赖检测 检测并报告循环依赖 dependency-manager.test.ts
拓扑排序 正确生成任务执行顺序 dependency-manager.test.ts
多种循环模式 三角形循环、链式循环等 dependency-manager.test.ts
任务恢复机制 undo 撤销操作 dependency-manager.test.ts
批量完成任务 多个任务同时完成 dependency-manager.test.ts
自依赖检测 检测并拒绝自依赖 dependency-manager.test.ts

迭代 3-7:集成测试

测试场景 描述 优先级 测试文件
会话恢复 重启会话后任务图完整恢复 task-manager.test.ts
并发访问 多个会话同时访问任务图 task-manager-concurrent.test.ts
任务执行 任务图驱动的子代理执行 task-execution.test.ts
错误恢复 部分失败时的状态一致性 task-manager.test.ts
跨会话任务引用 task:// URI 支持 session-integration.test.ts
任务图持久化 重启后状态保持 task-manager.test.ts

8.3 性能测试指标

指标 目标 测量方法 测试文件
任务加载时间 < 100ms(1000 任务) 计时加载操作 performance.test.ts
依赖图构建 < 50ms(1000 任务) 计时拓扑排序 performance.test.ts
文件写入延迟 < 10ms(单任务) 计时保存操作 performance.test.ts
并发吞吐量 > 100 ops/s 多线程并发写入 performance.test.ts
锁等待时间 < 100ms(99%) 统计锁获取延迟 performance.test.ts

8.4 容错测试场景

测试场景 模拟方式 验证目标
磁盘空间不足 使用磁盘配额限制或模拟 ENOSPC 正确返回错误码,不损坏数据
网络分区 模拟 I/O 错误或网络断开 重试机制正常工作
进程崩溃 强制终止进程 重启后数据一致性
文件损坏 手动修改 JSON 文件 正确检测并保留损坏文件
锁残留 创建锁文件后不删除 自动清理过期锁
并发冲突 多线程同时写入 乐观锁检测冲突

8.5 测试辅助工具

// packages/coding-agent/src/task/__tests__/test-utils.ts
export class MockFileSystem {
  private files: Record<string, string> = {};
  private locks: Record<string, number> = {};
  
  async writeFile(path: string, content: string, options?: { flag?: string }) {
    // 模拟磁盘空间不足
    if (Object.keys(this.files).length > 1000) {
      throw Object.assign(new Error("ENOSPC"), { code: "ENOSPC" });
    }
    
    // 模拟原子写入(wx 标志)
    if (options?.flag === 'wx' && this.files[path]) {
      throw Object.assign(new Error("EEXIST"), { code: "EEXIST" });
    }
    
    this.files[path] = content;
  }
  
  async readFile(path: string, encoding?: string): Promise<string> {
    if (!this.files[path]) {
      throw Object.assign(new Error("ENOENT"), { code: "ENOENT" });
    }
    return this.files[path];
  }
  
  // ... 其他方法
}

九、迁移路径

9.1 会话任务迁移

自动迁移(默认启用):

// 会话启动时自动检测并迁移
async function migrateSessionTasks(sessionDir: string): Promise<number> {
  const todoData = await loadSessionTodo(sessionDir);
  if (!todoData) return 0;
  
  const migratedCount = 0;
  for (const phase of todoData.phases) {
    for (const item of phase.tasks) {
      await taskManager.create(item.content, item.notes?.join("\n"));
      migratedCount++;
    }
  }
  return migratedCount;
}

迁移配置

export const MigrationConfigSchema = z.object({
  /** 是否自动迁移会话任务(默认关闭,强制手动触发) */
  autoMigrate: z.boolean().default(false),
  /** 是否保留原会话任务(迁移后删除,默认保留确保数据安全) */
  keepOriginal: z.boolean().default(true),
  /** 是否在迁移后同步完成状态 */
  syncCompleted: z.boolean().default(true),
});

9.2 迁移工作流

用户打开会话
    │
    ▼
检测到未迁移的会话任务
    │
    ▼
提示用户手动迁移
    │
    ├───────────────────────┐
    ▼                       ▼
手动迁移(默认)        自动迁移(可选)
    │                       │
    ▼                       ▼
用户确认后迁移         创建任务图任务
    │                       │
    └──────────┬────────────┘
               ▼
同步完成状态
    │
    ▼
任务图可用

十、性能评估

10.1 大规模任务图优化

优化策略

策略 实现方式 预期收益
增量加载 按需加载任务详情 减少启动时间
任务归档 自动归档已完成任务 降低内存占用
缓存机制 缓存依赖图计算结果 加速查询
异步写入 批量写入磁盘 降低 IO 压力

归档策略

export interface ArchiveConfig {
  /** 归档阈值(完成时间超过此天数的任务) */
  archiveAfterDays: number;
  /** 是否压缩归档文件 */
  compressArchive: boolean;
  /** 归档文件存储目录 */
  archiveDir: string;
}

10.2 内存占用预估

任务数量 预估内存 优化后内存
100 ~500KB ~500KB
1000 ~5MB ~3MB
10000 ~50MB ~20MB

十一、下一步行动

建议从迭代 1开始,先实现持久化存储层。每个迭代完成后:

  1. 运行单元测试验证
  2. 编写集成测试
  3. 更新文档
  4. 准备进入下一迭代

十二、适配器模式集成方案

12.1 核心设计思想

为了实现 s07 文档的核心目标——"任务图是多步工作的默认选择,Todo 仍可用于单次会话内的快速清单",本方案采用适配器模式实现新旧系统的无缝集成:

┌─────────────────────────────────────────────────────────────┐
│                    任务路由层(新增)                          │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  判断任务类型 → 选择合适的工具                        │    │
│  │  • 简单任务 → todo_write(会话级)                    │    │
│  │  • 复杂任务 → task_graph(跨会话级,默认)            │    │
│  └─────────────────────────────────────────────────────┘    │
│                          │                                 │
│          ┌───────────────┴───────────────┐                 │
│          ▼                               ▼                 │
┌───────────────────┐           ┌───────────────────────────┐ │
│   todo_write      │           │        task_graph         │ │
│ (快速清单)       │           │ (默认:多步任务管理)     │ │
└───────────────────┘           └───────────────────────────┘ │
                              │                               │
                              ▼                               │
                    ┌───────────────────┐                     │
                    │    适配器桥接层    │                     │
                    │   import / sync    │                     │
                    └───────────────────┘                     │
└─────────────────────────────────────────────────────────────┘

12.2 适配器接口设计

新增文件: packages/coding-agent/src/task/task-graph-adapter.ts

import * as z from "zod/v4";
import type { TodoPhase, TodoItem } from "../tools/todo-write";

/** 任务状态枚举(参考 swarm-extension 扩展) */
export const TaskStatus = z.enum([
  "pending",      // 待执行
  "waiting",      // 等待依赖
  "in_progress",  // 执行中
  "completed",    // 已完成
  "failed",       // 失败
  "aborted",      // 已中止
  "abandoned",    // 已放弃
]);
export type TaskStatus = z.infer<typeof TaskStatus>;

/** 任务数据结构(参考 swarm-extension 双向依赖设计) */
export const TaskSchema = z.object({
  id: z.number().describe("Unique task identifier"),
  subject: z.string().describe("Task title/summary"),
  description: z.optional(z.string().describe("Detailed task description")),
  status: TaskStatus,
  blockedBy: z.array(z.number()).describe("List of task IDs this task depends on"),
  blocks: z.array(z.number()).describe("List of task IDs blocked by this task"),
  reportsTo: z.array(z.number()).describe("List of task IDs this task reports to"),
  createdAt: z.number().describe("Unix timestamp in milliseconds"),
  updatedAt: z.number().describe("Unix timestamp in milliseconds"),
});
export type Task = z.infer<typeof TaskSchema>;

/** 任务图接口 */
export interface TaskGraph {
  create(subject: string, description?: string, blockedBy?: number[]): Task;
  get(taskId: number): Task | null;
  update(task: Task): void;
  delete(taskId: number): void;
  list(filter?: { status?: TaskStatus; blocked?: boolean }): Task[];
  complete(taskId: number): void;
  getAvailableTasks(): Task[];
  getBlockedTasks(): Task[];
  getExecutionWaves(): ExecutionWave[];
}

/** 执行波次(参考 swarm-extension/dag.ts) */
export interface ExecutionWave {
  waveIndex: number;
  taskIds: number[];
}

/**
 * 任务图适配器 - 桥接新旧任务系统
 * 核心目标:不修改现有代码,实现渐进式迁移
 */
export interface TaskGraphAdapter {
  loadFromSession(sessionDir: string): TaskGraph;
  saveToSession(sessionDir: string, graph: TaskGraph): void;
  toTodoPhases(graph: TaskGraph): TodoPhase[];
  fromTodoPhases(phases: TodoPhase[]): TaskGraph;
  importAvailableTasks(graph: TaskGraph, limit?: number): TodoPhase[];
  syncCompletedTask(graph: TaskGraph, item: TodoItem): boolean;
  syncAllTasks(graph: TaskGraph, phases: TodoPhase[]): number;
}

12.3 智能任务路由器

新增文件: packages/coding-agent/src/task/task-router.ts

/** 任务复杂度分析结果 */
export interface TaskAnalysis {
  complexity: "simple" | "complex";
  reason: string;
  estimatedSteps: number;
  hasDependencies: boolean;
}

/**
 * 任务路由器 - 根据任务特征选择合适的工具
 * 核心逻辑:复杂任务自动使用 task_graph(默认),简单任务使用 todo_write
 */
export class TaskRouter {
  /**
   * 分析任务复杂度
   * @param prompt 用户输入的任务描述
   * @returns 任务复杂度分析结果
   */
  analyzeTask(prompt: string): TaskAnalysis {
    const features = this.extractFeatures(prompt);
    
    // 判断复杂度:有依赖关系 或 步骤数 > 3 → 复杂任务
    if (features.hasDependencies || features.estimatedSteps > 3) {
      return {
        complexity: "complex",
        reason: features.hasDependencies 
          ? "任务包含依赖关系" 
          : `任务预计需要 ${features.estimatedSteps} 个步骤`,
        estimatedSteps: features.estimatedSteps,
        hasDependencies: features.hasDependencies,
      };
    }
    
    return {
      complexity: "simple",
      reason: "简单任务,适合会话内快速完成",
      estimatedSteps: features.estimatedSteps,
      hasDependencies: features.hasDependencies,
    };
  }

  /**
   * 根据任务复杂度选择合适的工具
   * @param prompt 用户输入的任务描述
   * @returns 推荐的工具名称
   */
  selectTool(prompt: string): "todo_write" | "task_graph" {
    const analysis = this.analyzeTask(prompt);
    return analysis.complexity === "complex" ? "task_graph" : "todo_write";
  }
}

12.4 统一任务工具

新增文件: packages/coding-agent/src/tools/task.ts

const TaskOp = z.enum([
  "create",           // 自动判断并创建任务
  "create_simple",    // 强制使用 todo_write
  "create_complex",   // 强制使用 task_graph
  "analyze",          // 分析任务复杂度
  "list",             // 列出所有任务(合并 todo 和 task_graph)
  "import",           // 从任务图导入可执行任务到会话
  "sync",             // 同步完成状态到任务图
]);

12.5 配置选项

修改文件: packages/coding-agent/src/config.ts

export const TaskConfigSchema = z.object({
  /** 是否启用自动任务路由 */
  autoRoute: z.boolean().default(true),
  
  /** 默认工具(当自动路由关闭时使用) */
  defaultTool: z.enum(["todo_write", "task_graph"]).default("task_graph"),
  
  /** 复杂任务的步骤阈值 */
  complexityThreshold: z.number().default(3),
  
  /** 是否自动导入任务图到会话 */
  autoImport: z.boolean().default(true),
  
  /** 是否自动同步完成状态 */
  autoSync: z.boolean().default(true),
  
  /** 最大导入任务数量 */
  importLimit: z.number().default(5),
  
  /** 执行模式(参考 swarm-extension/schema.ts) */
  executionMode: z.enum([
    "default",     // 默认模式,按依赖顺序执行
    "pipeline",    // 管道模式,支持多次迭代
    "parallel",    // 并行模式,无依赖任务同时执行
    "sequential",  // 顺序模式,按声明顺序执行
  ]).default("default"),
  
  /** pipeline 模式专属:目标迭代次数 */
  targetIterations: z.number().default(1),
  
  /** parallel 模式专属:最大并发数 */
  maxConcurrency: z.number().default(4),
});

12.6 工作流示例

场景 1:创建简单任务(自动路由到 todo_write)

用户:帮我创建一个任务:"完成文档"
助手:分析任务复杂度...简单任务(1个步骤,无依赖)
      自动选择:todo_write
结果:创建了快速任务,存储在会话中

场景 2:创建复杂任务(自动路由到 task_graph)

用户:帮我规划这个功能:第一步设计 API,第二步实现核心逻辑,第三步编写测试
助手:分析任务复杂度...复杂任务(3个步骤)
      自动选择:task_graph
结果:创建了 3 个任务,建立依赖链,自动导入第一个可用任务到会话

场景 3:列出所有任务

用户:列出所有任务
结果:
=== Session Tasks (todo_write) ===
[Quick Tasks]
  [ ] 完成文档

=== Persistent Tasks (task_graph) ===
#1 [completed] 设计 API
#2 [pending] 实现核心逻辑 (blocked by: 1)
#3 [pending] 编写测试 (blocked by: 2)

12.7 与原始设计的对比

维度 原始设计(s07-task-system.md) 适配器方案
向后兼容 ❌ 替换现有系统 ✅ 完全兼容
默认工具 task_graph task_graph(符合文档要求)
快速清单 ❌ 移除 ✅ 保留 todo_write
渐进式迁移 ❌ 一次性迁移 ✅ 支持分阶段迁移
智能路由 ❌ 无 ✅ 根据复杂度自动选择
集成方式 直接修改核心逻辑 通过适配器桥接

12.8 优势总结

优势 说明
符合文档要求 任务图是多步工作的默认选择
保留快速清单 Todo 仍可用于单次会话内的快速任务
向后兼容 不修改现有代码,原有的工具和命令保持不变
智能路由 根据任务复杂度自动选择合适的工具
渐进式迁移 支持从并行运行到完全集成的分阶段迁移
职责分离 会话级任务与跨会话任务清晰分离

12.9 可借鉴的实现参考(claude_code/src)

根据对 claude_code/src 任务系统的深入分析,以下是可直接借鉴的代码参考点:

12.9.1 任务数据模型设计

参考文件: src/utils/tasks.ts(第 76-89 行)

借鉴点: 双向依赖关系设计

// 参考:双向依赖模型
const TaskSchema = z.object({
  blocks: z.array(z.string()),       // 此任务阻塞的任务
  blockedBy: z.array(z.string()),    // 阻塞此任务的任务
});

应用建议: 在 TaskSchema 中增加 blocks 字段,实现双向依赖追踪。

12.9.2 文件持久化策略

参考文件: src/utils/tasks.ts(第 221-231 行)

借鉴点: 文件级存储方案

// 参考:任务文件路径结构
export function getTaskPath(taskListId: string, taskId: string): string {
  return join(getTasksDir(taskListId), `${taskId}.json`);
}

应用建议: 采用 ~/.omp/tasks/<taskListId>/<taskId>.json 结构存储任务。

12.9.3 并发安全机制

参考文件: src/utils/tasks.ts(第 102-108、284-308 行)

借鉴点: 文件锁 + 高水位标记

// 参考:锁配置
const LOCK_OPTIONS = {
  retries: { retries: 30, minTimeout: 5, maxTimeout: 100 },
};

// 参考:创建任务时获取锁
const release = await lockfile.lock(lockPath, LOCK_OPTIONS);
const highestId = await findHighestTaskId(taskListId);

应用建议: 使用 proper-lockfile 实现文件级锁,保证多进程安全。

12.9.4 状态管理与事件通知

参考文件: src/utils/tasks.ts(第 17-67 行)

借鉴点: 响应式状态同步

// 参考:事件信号机制
const tasksUpdated = createSignal();
export const onTasksUpdated = tasksUpdated.subscribe;
export function notifyTasksUpdated(): void {
  tasksUpdated.emit();
}

应用建议: 在 task-router.ts 中实现任务更新事件通知,支持 UI 响应式更新。

12.9.5 工具接口设计

参考文件: src/tools/TaskCreateTool/TaskCreateTool.ts(第 48-129 行)

借鉴点: 统一工具接口规范

// 参考:工具定义模式
export const TaskCreateTool = buildTool({
  name: TASK_CREATE_TOOL_NAME,
  async description() { return DESCRIPTION },
  get inputSchema() { return inputSchema() },
  get outputSchema() { return outputSchema() },
  async call({ subject, description }, context) {
    // 实现逻辑
  },
});

应用建议: 在 task.ts 统一任务工具中采用相同的模式。

12.9.6 任务列表查询

参考文件: src/tools/TaskListTool/TaskListTool.ts(第 64-90 行)

借鉴点: 依赖过滤逻辑

// 参考:过滤已完成的依赖
const resolvedTaskIds = new Set(
  allTasks.filter(t => t.status === 'completed').map(t => t.id)
);

const tasks = allTasks.map(task => ({
  ...task,
  blockedBy: task.blockedBy.filter(id => !resolvedTaskIds.has(id)),
}));

应用建议: 在 TaskGraph.getAvailableTasks() 中实现类似的依赖过滤。

12.9.7 任务更新与依赖管理

参考文件: src/tools/TaskUpdateTool/TaskUpdateTool.ts(第 300-324 行)

借鉴点: 依赖关系更新

// 参考:建立双向依赖
if (addBlocks && addBlocks.length > 0) {
  for (const blockId of newBlocks) {
    await blockTask(taskListId, taskId, blockId);
  }
}

应用建议: 在 TaskGraph.update() 中支持依赖关系的增删。

12.9.8 任务状态订阅

参考文件: src/hooks/useTasksV2.ts(第 29-200 行)

借鉴点: 响应式任务列表订阅

// 参考:文件监听 + 轮询回退
class TasksV2Store {
  #watcher: FSWatcher | null = null;
  #pollTimer: ReturnType<typeof setTimeout> | null = null;
  
  #fetch = async (): Promise<void> => {
    this.#rewatch(getTasksDir(taskListId));
    const current = await listTasks(taskListId);
    this.#tasks = current;
    this.#notify();
  };
}

应用建议: 在适配器中实现类似的任务列表订阅机制。

12.9.9 借鉴参考对照表

claude_code 文件路径 借鉴内容 适配方案应用位置
src/utils/tasks.ts 任务数据模型、双向依赖 task-graph-adapter.ts
src/utils/tasks.ts 文件锁、高水位标记 并发安全机制
src/utils/tasks.ts 事件信号机制 状态同步
src/hooks/useTasksV2.ts 文件监听、轮询回退 任务列表订阅
src/tools/TaskCreateTool.ts 工具接口规范 task.ts
src/tools/TaskUpdateTool.ts 依赖管理 TaskGraph.update()
src/tools/TaskListTool.ts 依赖过滤 TaskGraph.getAvailableTasks()

十二、Swarm-Extension DAG 功能分析与优化建议

12.10 Swarm-Extension DAG 功能概述

项目中已存在的 swarm-extension/src/swarm/dag.ts 提供了成熟的 DAG(有向无环图)实现,可作为任务图设计的重要参考:

12.10.1 核心功能

模块 文件路径 功能描述
DAG 构建 swarm/dag.ts 构建依赖图、检测循环、生成执行波次
数据模型 swarm/schema.ts Swarm 配置定义、YAML 解析
状态管理 swarm/state.ts 持久化状态追踪(.swarm_<name>/
执行器 swarm/executor.ts 执行单个 swarm agent
流程控制 swarm/pipeline.ts 编排执行波次和迭代

12.10.2 关键设计特点

双向依赖关系dag.ts 第 34-40 行):

  • 支持 waits_for(显式等待)和 reports_to(隐式依赖)两种关系定义
  • reports_to 语义:A reports_to B 意味着 B 等待 A 完成

三种执行模式schema.ts 第 27 行):

  • pipeline:管道模式,支持多次迭代(target_count
  • parallel:并行模式,所有无依赖关系的 Agent 并行执行
  • sequential:顺序模式,按声明顺序依次执行

执行波次生成dag.ts 第 106-146 行):

  • 使用 Kahn's 算法进行拓扑排序
  • 同一波次内的节点并行执行,波次间顺序执行

持久化状态追踪state.ts 第 42-127 行):

  • 持久化到 .swarm_<name>/ 目录
  • 支持断点续传和状态恢复

12.11 功能覆盖对比

功能 文档设计 Swarm-Extension 实现 优化建议
依赖关系 blocked_by 单向依赖 waits_for + reports_to 双向依赖 ✅ 采用双向依赖设计
执行模式 未提及 pipeline/parallel/sequential ✅ 引入多模式支持
拓扑排序 提及但未详述 Kahn's 算法 + 执行波次 ✅ 采用执行波次概念
循环检测 提及 Kahn's 算法 ✅ 保持一致
持久化 .tasks/task_<id>.json .swarm_<name>/ 目录结构 ✅ 扩展目录结构
状态恢复 未明确提及 完整支持断点续传 ✅ 添加状态恢复机制
迭代执行 未提及 支持多次迭代 ✅ 添加迭代支持
YAML 配置 未提及 完整支持 YAML ✅ 考虑 YAML 支持

12.12 架构优化建议

12.12.1 与 Swarm-Extension 的集成方案

建议集成策略

  • Task Graph:专注于任务管理(持久化、依赖管理、状态追踪)
  • Swarm-Extension:专注于执行编排(多 Agent 协作、波次执行、迭代)

可能的桥接方案

export class SwarmTaskBridge {
  #taskGraph: TaskGraph;
  #swarmDef: SwarmDefinition;
  
  taskGraphToSwarm(): SwarmDefinition;
  syncSwarmResult(result: PipelineResult): void;
}

12.12.2 代码复用建议

Swarm-Extension 组件 复用内容 应用位置
buildExecutionWaves 执行波次生成逻辑 dependency-manager.ts
detectCycles Kahn's 算法循环检测 dependency-manager.ts
StateTracker 持久化设计 persistence.ts
PipelineController 编排逻辑 task-manager.ts

十三、文档完备性说明

13.1 已补充的关键特性

本文档已补充 oh-my-pi 任务系统的以下关键特性:

13.1.1 todo-write 工具的 Markdown 双向转换

  • 实现位置packages/coding-agent/src/tools/todo-write.ts(行号需确认)
  • 核心功能
    • phasesToMarkdown():将任务列表渲染为标准 Markdown checklist 格式
    • markdownToPhases():解析 Markdown checklist 并恢复为任务列表
  • 支持的状态标记
    • [ ][ ]:pending(待办)
    • [x][X]:completed(已完成)
    • [/][>]:in_progress(进行中)
    • [-][~]:abandoned(已放弃)
  • 备注支持:使用 blockquote 格式(> text)附加任务备注
  • 实际价值:用户可以手动编辑 Markdown 文件来管理任务,支持版本控制和协作

13.1.2 task 工具的计划模式集成

  • 实现位置packages/coding-agent/src/task/index.ts 第 614-623 行
  • 触发条件:父会话启用计划模式(planModeState.enabled === true
  • 核心行为
    • 注入 plan-mode-subagent.md 提示词到子代理的 system prompt
    • 限制子代理只能使用只读工具:readsearchfindlspweb_search
    • 禁用子代理的 spawns 能力(防止递归调用)
  • 实际价值:在规划阶段确保子代理只进行只读分析,不修改代码

13.1.3 task 工具的模型覆盖机制

  • 实现位置packages/coding-agent/src/task/index.ts 第 626-635 行
  • 优先级顺序
    1. 设置覆盖(task.agentModelOverrides[agentName]
    2. Agent 定义中的模型(agent.model
    3. 父会话的模型(session.getActiveModelString()
  • 环境变量支持PI_BLOCKED_AGENT 防止递归调用(第 232 行)
  • 实际价值:允许为不同子代理配置不同的模型,优化成本和性能

10.1.4 task 工具的隔离执行流程

  • 实现位置packages/coding-agent/src/task/index.ts 第 670-1157 行
  • 三种隔离模式
    1. worktree 模式(第 888-889 行):
      • 创建 Git worktree 进行隔离
      • 适合 Git 仓库项目
      • 支持分支合并模式
    2. fuse-overlay 模式(第 884 行):
      • 使用 FUSE overlayfs 实现文件系统隔离
      • 适合 Linux/macOS 系统
      • 性能较好,但需要 FUSE 支持
    3. fuse-projfs 模式(第 886 行):
      • 使用 FUSE projfs 实现文件系统隔离
      • 适合 Windows 系统
      • Windows 原生支持
  • 自动清理:任务执行完成后自动清理隔离环境(第 999-1007 行)
  • 结果合并
    • patch 模式(第 1103-1151 行):生成并应用补丁
    • branch 模式(第 1073-1101 行):创建并合并分支

13.1.5 executor.ts 子代理执行器详解

  • 实现位置packages/coding-agent/src/task/executor.ts
  • 核心功能:负责在进程内运行子代理,转发 AgentEvents 进行进度追踪
  • 执行流程
    1. 初始化阶段(第 455-513 行):解析参数、检查中止状态、设置进度对象
    2. 配置阶段(第 515-560 行):设置 artifacts 路径、递归深度限制、工具列表、模型配置
    3. 事件处理阶段(第 722-910 行):注册事件监听器,处理 message_starttool_execution_start/endmessage_update/endagent_end 等事件
    4. 子代理执行(第 912-1192 行):创建会话、发送任务提示、等待空闲、处理 yield 工具调用
    5. 结果整理(第 1203-1291 行):处理输出、计算状态、生成最终结果
  • IPC 通信机制
    • 通过 EventBus 发送子代理事件到三个通道:
      • TASK_SUBAGENT_EVENT_CHANNEL:转发所有 AgentEvent
      • TASK_SUBAGENT_LIFECYCLE_CHANNEL:发送生命周期事件(started/completed/failed/aborted)
      • TASK_SUBAGENT_PROGRESS_CHANNEL:发送进度更新
    • 支持 MCP 代理工具:通过 createMCPProxyTools() 复用父会话的 MCP 连接(第 390-438 行)
  • 进度追踪:实时追踪工具调用次数、token 消耗、执行耗时(第 746-785 行)
  • Yield 工具处理:自动检测 yield 工具调用,提取任务结果(第 804-806 行)
  • 超时处理:支持 MCP 调用超时(60 秒)和中止信号处理(第 90-120 行)

13.1.6 render.ts TUI 渲染器详解

  • 实现位置packages/coding-agent/src/task/render.ts
  • 核心功能:提供 renderCallrenderResult 函数,在终端 UI 中显示任务执行状态
  • 渲染组件
    1. 状态图标(第 38-51 行):根据任务状态显示不同图标(pending/running/completed/failed/aborted)
    2. 进度渲染(第 491-629 行):renderAgentProgress() 渲染运行中的子代理进度
    3. 结果渲染(第 729-881 行):renderAgentResult() 渲染完成后的子代理结果
    4. JSON 树渲染(第 123-260 行):支持嵌套 JSON 数据的树形展示
    5. 审查结果渲染(第 634-676 行):renderReviewResult() 渲染代码审查结果(正确性、置信度、发现列表)
    6. 查找结果渲染(第 681-724 行):renderFindings() 渲染审查发现,按优先级排序
  • 扩展点
    • 通过 subprocessToolRegistry.register() 注册自定义工具渲染器(第 1005-1014 行)
    • 支持自定义 extractDatarenderFinal 方法
  • 渲染模式
    • 折叠模式:显示摘要信息,限制行数(默认 3 行)
    • 展开模式:显示完整内容,支持深度嵌套(默认 6 层)
  • 主题支持:通过 Theme 对象实现终端颜色和样式定制

13.1.7 SDK 集成说明

  • 实现位置packages/coding-agent/src/sdk/
  • 使用方式
    import { createAgentSession, discoverAuthStorage } from "@oh-my-pi/coding-agent";
    
    // 创建任务执行会话
    const authStorage = await discoverAuthStorage();
    const { session } = await createAgentSession({
      cwd: "/path/to/project",
      authStorage,
      toolNames: ["task"],
      // ... 其他配置
    });
    
    // 调用 task 工具执行子代理任务
    const result = await session.prompt("Execute task", {
      attribution: "agent",
    });
    
  • session-manager 集成
    • 会话初始化时自动加载任务图(第 592-594 行)
    • 支持 task:// URI 协议读取任务详情
  • executor 集成
    • runSubprocess() 函数是任务执行的核心入口(第 455 行)
    • 支持通过 ExecutorOptions 配置任务参数、模型覆盖、隔离模式等

13.2 API 设计风格说明

本文档的 API 设计遵循 oh-my-pi 现有代码风格:

13.2.1 使用 Zod v4 进行类型定义

// 推荐(与现有代码一致)
import * as z from "zod/v4";

export const TaskStatus = z.enum(["pending", "in_progress", "completed"]).describe("Task status");
export type TaskStatus = z.infer<typeof TaskStatus>;

export const TaskSchema = z.object({
  id: z.number().describe("Unique task identifier"),
  subject: z.string().describe("Task title/summary"),
  // ...
});
export type Task = z.infer<typeof TaskSchema>;

参考位置

  • packages/coding-agent/src/task/types.ts 第 4 行(Zod v4 导入)
  • packages/coding-agent/src/tools/todo-write.ts 第 6 行(Zod v4 导入)

13.2.2 错误处理模式

oh-my-pi 采用返回值错误处理模式,避免抛出异常:

// 推荐(返回 null 而非抛出异常)
export interface TaskPersistence {
  load(taskId: number): Task | null;
  // ...
}

// 推荐(返回结构化错误信息)
validateDAG(): { valid: boolean; cycle?: number[] };

参考位置

  • packages/coding-agent/src/task/index.ts 第 534-550 行(错误返回模式)
  • packages/coding-agent/src/task/types.ts 第 164-186 行(结构化错误定义)

工具级错误处理示例task/index.ts 第 553-570 行):

// 检查代理是否在设置中被禁用
const disabledAgents = this.session.settings.get("task.disabledAgents") as string[];
if (disabledAgents.length > 0 && disabledAgents.includes(agentName)) {
    const enabled = agents.filter(a => !disabledAgents.includes(a.name)).map(a => a.name);
    return {
        content: [{ 
            type: "text", 
            text: `Agent "${agentName}" is disabled in settings. 
                   Enable it via /agents, or use a different agent type.
                   ${enabled.length > 0 ? ` Available: ${enabled.join(", ")}` : ""}` 
        }],
        details: { projectAgentsDir, results: [], totalDurationMs: 0 },
    };
}

核心原则

  1. 工具执行失败时返回带有 contentdetails 的标准结果对象
  2. 错误信息包含在 content 数组中的 text 类型项
  3. details 保持完整结构,results 为空数组,totalDurationMs 记录耗时

13.2.3 接口方法命名

  • 使用动词开头:createloadsavedelete
  • 查询方法使用 getlist 前缀:getTasklistTasks
  • 布尔查询使用 is 前缀:isAvailableisBlocked

13.2.4 并发控制机制

oh-my-pi 通过 parallel.ts 提供并发控制能力:

Semaphore 信号量类task/parallel.ts 第 89-115 行):

export class Semaphore {
    #max: number;
    #current = 0;
    #queue: Array<() => void> = [];

    constructor(max: number) {
        this.#max = Math.max(1, max);
    }

    async acquire(): Promise<void> {
        if (this.#current < this.#max) {
            this.#current++;
            return;
        }
        const { promise, resolve } = Promise.withResolvers<void>();
        this.#queue.push(resolve);
        return promise;
    }

    release(): void {
        const next = this.#queue.shift();
        if (next) {
            next();
        } else {
            this.#current--;
        }
    }
}

mapWithConcurrencyLimit 并行执行函数task/parallel.ts 第 26-84 行):

export async function mapWithConcurrencyLimit<T, R>(
    items: T[],
    concurrency: number,
    fn: (item: T, index: number) => Promise<R>,
    signal?: AbortSignal,
): Promise<ParallelResult<R>> {
    // 创建 worker pool,限制并发数
    const workers = Array(limit).fill(null).map(() => worker());
    // ...
}

使用方式task/index.ts):

const results = await mapWithConcurrencyLimit(
    tasks,
    maxConcurrency, // 从 settings.get("task.maxConcurrency") 获取
    async (task, index) => {
        return await runSubprocess({ ...options, task });
    },
    signal,
);

并发配置:通过 task.maxConcurrency 设置控制最大并发数。

13.3 架构图补充说明

本文档第 3.3 节的架构图已更新,补充了以下细节:

13.3.1 todo-write 工具架构

  • 明确展示 Markdown 双向转换能力
  • 展示 Phase 分组管理机制
  • 说明 Session 存储方式(USER_TODO_EDIT_CUSTOM_TYPE

13.3.2 task 工具架构

  • 展示 Agent 发现机制的三层结构
  • 详细说明三种隔离执行模式
  • 展示结果合并的两种模式
  • 补充计划模式集成和模型覆盖机制

10.4 与现有代码的兼容性

本文档的设计确保与现有代码的兼容性:

方面 兼容性保证
类型系统 使用 Zod v4,与现有类型定义风格一致
错误处理 返回 null 或结构化错误,不抛出异常
命名规范 遵循现有命名约定(动词开头、驼峰命名)
文件组织 新增文件放在 packages/coding-agent/src/task/ 目录
工具集成 新增工具命令作为 task 工具的扩展,不修改现有工具
会话集成 通过接口扩展,不修改核心会话逻辑

十四、需要修改的现有代码列表

14.1 修改策略概述

为最小化对现有代码的侵入,采用独立模块 + 最小引用方案:

策略 说明 侵入性
新建独立模块 任务图功能完全封装在新文件中 零侵入
最小引用集成 原文件仅添加导入和调用,不修改核心逻辑 低侵入
配置开关控制 通过配置决定是否启用任务图功能 风险可控

14.2 需要修改的现有文件(最小侵入)

文件路径 修改目的 修改类型 侵入性 改动量
packages/coding-agent/src/task/types.ts 添加 TaskGraph、Task 等新类型定义 扩展 < 50 行
packages/coding-agent/src/task/index.ts 添加 TaskManager 导入和调用入口 扩展 < 30 行
packages/coding-agent/src/session/session-manager.ts 添加任务图加载钩子和 URI 解析方法 扩展 < 40 行
packages/coding-agent/src/config.ts 添加任务系统配置选项 扩展 < 30 行

14.3 需要新增的文件(零侵入)

文件路径 目的 职责
packages/coding-agent/src/task/persistence.ts 任务持久化层 读写 .tasks/task_{id}.json,文件锁控制
packages/coding-agent/src/task/dependency-manager.ts 依赖关系管理 依赖添加/移除、自动解锁、DAG 验证
packages/coding-agent/src/task/task-manager.ts 任务管理器核心 组合持久化和依赖管理,提供统一接口
packages/coding-agent/src/task/task-graph-adapter.ts 适配器桥接层 桥接新旧任务系统,支持数据迁移
packages/coding-agent/src/task/task-router.ts 智能任务路由器 根据任务复杂度选择工具
packages/coding-agent/src/tools/task.ts 统一任务工具入口 提供 task_create/update/list/get/complete 命令

14.4 修改详情

14.4.1 packages/coding-agent/src/task/types.ts

修改目的:添加持久化任务图所需的核心类型定义

// 新增任务状态枚举
export const TaskStatus = z.enum(["pending", "in_progress", "completed"]);
export type TaskStatus = z.infer<typeof TaskStatus>;

// 新增任务定义
export const TaskSchema = z.object({
  id: z.number().describe("Unique task identifier"),
  subject: z.string().describe("Task title/summary"),
  description: z.string().optional().describe("Detailed description"),
  status: TaskStatus,
  blockedBy: z.array(z.number()).default([]).describe("Task IDs that block this task"),
  createdAt: z.number().describe("Unix timestamp"),
  updatedAt: z.number().describe("Unix timestamp"),
  completedAt: z.number().optional().describe("Unix timestamp when completed"),
});
export type Task = z.infer<typeof TaskSchema>;

// 新增任务图定义
export interface TaskGraph {
  tasks: Map<number, Task>;
  nextId: number;
}

14.4.2 packages/coding-agent/src/task/index.ts

修改目的:添加 TaskManager 集成入口(最小侵入)

// 新增导入
import { TaskManager } from "./task-manager";

class TaskTool {
  #taskManager: TaskManager | null = null;

  async execute(
    _toolCallId: string,
    rawParams: unknown,
    signal?: AbortSignal,
    onUpdate?: AgentToolUpdateCallback<TaskToolDetails>,
  ): Promise<AgentToolResult<TaskToolDetails>> {
    const params = rawParams as TaskParams;
    
    // 初始化 TaskManager(懒加载)
    if (!this.#taskManager) {
      this.#taskManager = new TaskManager({
        tasksDir: path.join(this.session.cwd, ".tasks"),
        settings: this.session.settings,
      });
    }

    // 判断是否为任务图命令
    if (params.command === "create_task" || params.command === "complete_task") {
      return this.#executeTaskGraphCommand(params, signal, onUpdate);
    }

    // 原有执行逻辑保持不变
    // ...
  }

  /**
   * 执行任务图相关命令(新增方法,不修改原有逻辑)
   */
  async #executeTaskGraphCommand(
    params: TaskParams,
    signal?: AbortSignal,
    onUpdate?: AgentToolUpdateCallback<TaskToolDetails>,
  ): Promise<AgentToolResult<TaskToolDetails>> {
    if (!this.#taskManager) {
      return createTaskModeError("Task manager not initialized");
    }

    switch (params.command) {
      case "create_task":
        const task = this.#taskManager.create(
          params.subject,
          params.description,
          params.blockedBy,
        );
        return {
          content: [{ type: "text", text: `Created task #${task.id}: ${task.subject}` }],
          details: { projectAgentsDir: null, results: [], totalDurationMs: 0 },
        };
      
      case "complete_task":
        const completed = this.#taskManager.complete(params.taskId);
        return {
          content: [{ type: "text", text: `Completed task #${params.taskId}` }],
          details: { projectAgentsDir: null, results: [], totalDurationMs: 0 },
        };
      
      default:
        return createTaskModeError(`Unknown command: ${params.command}`);
    }
  }
}

侵入性说明

  • 仅添加导入和新方法,不修改原有 execute()#executeSync() 方法
  • 通过懒加载延迟初始化,不影响启动性能
  • 原有子代理执行功能保持完全独立

14.4.3 packages/coding-agent/src/session/session-manager.ts

修改目的:添加任务图加载钩子和 URI 解析(最小侵入)

// 新增导入
import { TaskManager } from "../task/task-manager";
import { TaskUriHandler } from "./task-uri-handler";

class SessionManager {
  #taskManager: TaskManager | null = null;

  async #initSessionFile(sessionFile: string): Promise<void> {
    await this.setSessionFile(sessionFile);
    
    // 条件加载任务图(通过配置控制)
    if (this.#shouldLoadTaskGraph()) {
      await this.#initTaskManager();
    }
  }

  /**
   * 初始化任务管理器(懒加载)
   */
  async #initTaskManager(): Promise<void> {
    try {
      const tasksDir = path.join(this.cwd, ".tasks");
      this.#taskManager = new TaskManager({ tasksDir });
      await this.#taskManager.load();
    } catch {
      // 任务图加载失败不影响会话启动
      this.#taskManager = null;
    }
  }

  /**
   * 判断是否应该加载任务图
   */
  #shouldLoadTaskGraph(): boolean {
    return this.session.settings.get("task.graph.enabled") === true;
  }

  /**
   * 解析 task:// URI(新增方法)
   */
  resolveTaskUri(uri: string): Task | undefined {
    if (!this.#taskManager) return undefined;
    return TaskUriHandler.resolve(uri, this.#taskManager);
  }

  /**
   * 获取任务管理器(新增方法)
   */
  getTaskManager(): TaskManager | null {
    return this.#taskManager;
  }
}

侵入性说明

  • 新增私有字段 #taskManager(不影响公共 API)
  • 新增方法 resolveTaskUri()getTaskManager()(扩展公共 API)
  • #initSessionFile() 中添加条件加载钩子(低侵入)
  • 任务图加载失败不影响会话启动(容错设计)

14.4.4 packages/coding-agent/src/config.ts

修改目的:添加任务系统配置选项

export const TaskConfigSchema = z.object({
  // 任务图核心配置
  graph: z.object({
    enabled: z.boolean().default(false), // 默认关闭,保持核心功能纯净
    autoLoad: z.boolean().default(true),
  }),
  
  // 迁移配置(独立模块控制)
  migration: z.object({
    enabled: z.boolean().default(false),   // 默认关闭迁移功能
    keepOriginal: z.boolean().default(true), // 保留原数据
    syncCompleted: z.boolean().default(true),
  }),
  
  // 路由配置
  router: z.object({
    autoRoute: z.boolean().default(true),
    defaultTool: z.enum(["todo_write", "task_graph"]).default("task_graph"),
    complexityThreshold: z.number().default(3),
  }),
  
  // 导入配置
  import: z.object({
    autoImport: z.boolean().default(true),
    autoSync: z.boolean().default(true),
    importLimit: z.number().default(5),
  }),
});
export type TaskConfig = z.infer<typeof TaskConfigSchema>;

侵入性说明

  • 仅添加新配置项,不修改现有配置结构
  • 默认值设计确保新环境零开销

14.5 架构设计优势

14.5.1 最小侵入架构图

┌─────────────────────────────────────────────────────────────────┐
│                        原有代码层                              │
│  ┌─────────────────────────┐    ┌───────────────────────────┐  │
│  │   task/index.ts         │    │   session-manager.ts      │  │
│  │   (原有子代理执行逻辑)    │    │   (原有会话管理逻辑)       │  │
│  └───────────┬─────────────┘    └─────────────┬───────────┘  │
│              │                                │               │
│              ▼                                ▼               │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              桥接层(最小侵入)                           │   │
│  │  • 导入 TaskManager                                    │   │
│  │  • 添加命令分发入口                                      │   │
│  │  • 条件加载任务图                                        │   │
│  │  • 提供 URI 解析方法                                     │   │
│  └───────────────────┬─────────────────────────────────────┘   │
│                      │                                         │
│                      ▼                                         │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              新建独立模块层(零侵入)                       │   │
│  │  ┌─────────────────────────────────────────────────┐   │   │
│  │  │  task/task-manager.ts  - 任务管理器核心         │   │   │
│  │  │  task/persistence.ts   - 持久化存储层          │   │   │
│  │  │  task/dependency-manager.ts - 依赖关系管理     │   │   │
│  │  │  task/task-graph-adapter.ts - 适配器桥接层     │   │   │
│  │  │  task/task-router.ts    - 智能任务路由         │   │   │
│  │  │  tools/task.ts          - 统一任务工具入口     │   │   │
│  │  └─────────────────────────────────────────────────┘   │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

14.5.2 侵入性对比总结

方案 修改文件数 代码改动量 侵入等级 风险
直接修改核心逻辑 2 中-高 影响现有功能
独立新文件 + 最小引用 4(修改)+ 6(新增) 低-中 风险隔离
完全独立模块 0(修改)+ 6(新增) 需要额外集成

推荐方案:独立新文件 + 最小引用

14.5.3 关键设计原则

原则 实现方式 价值
开闭原则 对扩展开放(新增模块),对修改关闭(原文件不变) 不影响现有功能
单一职责 每个新文件只负责一个功能 易于测试和维护
依赖倒置 原有代码依赖抽象接口 降低耦合度
懒加载 TaskManager 延迟初始化 不影响启动性能
容错设计 任务图加载失败不影响会话 提高健壮性

十五、API 兼容性

15.1 SDK 集成示例

15.1.1 创建任务执行会话

import { createAgentSession, discoverAuthStorage } from "@oh-my-pi/coding-agent";
import type { TaskManager, Task } from "@oh-my-pi/coding-agent";

// 创建任务执行会话
const authStorage = await discoverAuthStorage();
const { session, taskManager } = await createAgentSession({
  cwd: "/path/to/project",
  authStorage,
  toolNames: ["task", "todo_write"],
  enableTaskGraph: true, // 启用持久化任务图
});

// 使用 TaskManager API
// 创建任务
const task: Task = await taskManager.create(
  "Implement user authentication",
  "Add JWT token-based authentication with refresh token support",
  [1, 2] // blockedBy: 依赖任务 1 和 2
);

// 查询任务
const availableTasks = await taskManager.getAvailableTasks();
const blockedTasks = await taskManager.getBlockedTasks();

// 完成任务(自动解锁下游)
await taskManager.complete(task.id);

// 监听任务更新
taskManager.on("taskUpdated", (updatedTask: Task) => {
  console.log(`Task ${updatedTask.id} status changed to ${updatedTask.status}`);
});

15.1.2 会话级任务 API

// 使用 todo_write 工具创建会话级任务
const todoResult = await session.prompt(`
我需要完成以下任务:
1. 设计数据库 schema
2. 实现 API 接口
3. 编写单元测试
`, {
  attribution: "agent",
  toolPreferences: ["todo_write"], // 强制使用会话级任务
});

// 使用 task_graph 创建持久化任务图
const taskGraphResult = await session.prompt(`
帮我规划这个功能开发:
第一步:设计 API(需要先完成需求文档)
第二步:实现核心逻辑(依赖第一步)
第三步:编写测试(依赖第二步)
第四步:部署上线(依赖第三步)
`, {
  attribution: "agent",
  toolPreferences: ["task_graph"], // 强制使用持久化任务图
});

15.1.3 任务图与会话集成

// 会话启动时自动加载任务图
const session = await createAgentSession({
  cwd: "/path/to/project",
  authStorage,
  autoLoadTaskGraph: true, // 自动加载 .tasks/ 目录
  autoMigrateSessionTasks: false, // 手动迁移会话任务(需通过命令触发)
});

// 访问任务图
const taskGraph = session.getTaskGraph();

// 使用 task:// URI 读取任务详情
const task = await session.readUri("task://1");

// 监听任务状态变化
session.on("taskStatusChange", (event) => {
  console.log(`Task ${event.taskId}: ${event.oldStatus} -> ${event.newStatus}`);
});

15.1.4 子代理执行集成

import { runSubprocess, TaskExecutionOptions } from "@oh-my-pi/coding-agent";

// 执行任务图中的可用任务
const options: TaskExecutionOptions = {
  agent: "code-assistant",
  taskGraph: taskManager,
  concurrency: 2,
  isolationMode: "worktree",
  onProgress: (progress) => {
    console.log(`Task ${progress.taskId}: ${progress.status}`);
  },
};

const results = await runSubprocess(options);

// 处理执行结果
for (const result of results) {
  if (result.exitCode === 0) {
    await taskManager.complete(result.taskId);
  }
}

15.2 向后兼容性保证

API 特性 兼容性 说明
todo_write 工具 ✅ 完全兼容 原有工具保持不变
task 工具(子代理执行) ✅ 完全兼容 原有功能保持不变
会话任务存储 ✅ 完全兼容 迁移默认关闭,需手动触发
工具调用方式 ✅ 完全兼容 新增工具不影响现有调用

15.3 API 版本控制

// 版本化 API
export const TaskGraphAPIVersion = "v1";

export interface TaskGraphAPI {
  version: string;
  create: (subject: string, description?: string, blockedBy?: number[]) => Task;
  get: (taskId: number) => Task | null;
  update: (taskId: number, updates: Partial<Task>) => void;
  delete: (taskId: number) => void;
  list: (filter?: TaskFilter) => Task[];
  complete: (taskId: number) => void;
}

十六、配置迁移指南

16.1 分阶段发布策略(方案 B)

为保证功能的原子性内敛性,迁移功能采用分阶段发布方案:

发布阶段       | 功能范围                          | 迁移支持
-------------|-----------------------------------|---------
v1.0         | 核心任务图功能(持久化、依赖管理)  | ❌ 无迁移
v1.1         | 添加迁移工具和命令                  | ✅ 手动迁移
v1.2         | 完善迁移体验和验证                  | ✅ 增强迁移

设计原则

  • 原子性:迁移功能完全独立于核心任务图功能
  • 内敛性:迁移逻辑封装在独立模块,不污染核心接口
  • 风险隔离:迁移失败不影响核心功能正常使用
  • 渐进式采用:用户可选择迁移时机

16.2 迁移工作流

┌─────────────────────────────────────────────────────────────┐
│                    用户决定迁移                              │
└───────────────────┬─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│  执行命令: omp task migrate --dry-run                      │
│  预览待迁移的会话任务                                       │
└───────────────────┬─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│  用户确认后执行: omp task migrate                           │
└───────────────────┬─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│  创建任务图任务(保留 phase 信息作为 description)           │
└───────────────────┬─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│  同步完成状态(可选)                                       │
└───────────────────┬─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────┐
│  显示迁移结果报告                                           │
└─────────────────────────────────────────────────────────────┘

16.3 迁移配置详解

export const MigrationConfigSchema = z.object({
  /** 是否启用迁移功能(默认关闭,保持核心功能纯净) */
  enabled: z.boolean().default(false),
  
  /** 是否在迁移后保留原会话任务(默认保留,确保数据安全) */
  keepOriginal: z.boolean().default(true),
  
  /** 是否在迁移后同步完成状态 */
  syncCompleted: z.boolean().default(true),
  
  /** 迁移日志级别 */
  logLevel: z.enum(["debug", "info", "warn", "error"]).default("info"),
});

配置说明

  • enabled: false:默认关闭迁移功能,确保新环境零开销
  • keepOriginal: true:默认保留原任务,防止数据丢失
  • 无自动迁移配置:迁移必须手动触发

16.4 手动迁移命令

# 查看待迁移的会话任务(预览模式,不执行迁移)
omp task migrate --dry-run

# 执行迁移(保留原任务)
omp task migrate

# 执行迁移并删除原任务(谨慎使用)
omp task migrate --remove-original

# 仅迁移已完成的任务
omp task migrate --only-completed

# 迁移指定会话的任务
omp task migrate --session <session-id>

# 查看迁移状态
omp task migrate --status

# 验证迁移结果(检查数据完整性)
omp task migrate --verify

16.5 迁移数据格式转换

会话任务字段 任务图字段 转换规则
content subject 直接映射
notes description 用换行符连接
status status 四态 → 三态(abandoned → pending)
phase.name description 前缀 添加到 description 开头
blockedBy 初始为空数组
createdAt 使用会话创建时间
updatedAt 使用当前时间

16.6 迁移模块架构

// 独立的迁移模块,不依赖核心功能启动
export class MigrationService {
  constructor(
    private taskManager: TaskManager,
    private sessionManager: SessionManager
  ) {}
  
  /**
   * 检查是否有待迁移的会话任务
   */
  async hasPendingMigration(): Promise<boolean> {
    const todoData = await this.sessionManager.loadSessionTodo();
    return todoData && todoData.phases.length > 0;
  }
  
  /**
   * 预览迁移(不执行实际迁移)
   */
  async preview(): Promise<MigrationPreview> {
    const todoData = await this.sessionManager.loadSessionTodo();
    if (!todoData) {
      return { tasks: [], total: 0, warnings: [] };
    }
    
    return {
      tasks: todoData.phases.flatMap(phase => 
        phase.tasks.map(task => ({
          content: task.content,
          status: task.status,
          phase: phase.name,
        }))
      ),
      total: todoData.phases.reduce((sum, p) => sum + p.tasks.length, 0),
      warnings: [],
    };
  }
  
  /**
   * 执行迁移
   */
  async migrate(options: MigrationOptions): Promise<MigrationResult> {
    const todoData = await this.sessionManager.loadSessionTodo();
    if (!todoData) {
      return { success: true, migrated: 0, skipped: 0, errors: [] };
    }
    
    const migrated: number[] = [];
    const errors: MigrationError[] = [];
    
    for (const phase of todoData.phases) {
      for (const item of phase.tasks) {
        try {
          const task = await this.taskManager.create(
            item.content,
            `${phase.name}: ${item.notes?.join("\n") || ""}`,
            []
          );
          migrated.push(task.id);
          
          if (options.syncCompleted && item.status === "completed") {
            await this.taskManager.complete(task.id);
          }
        } catch (err) {
          errors.push({
            content: item.content,
            error: err instanceof Error ? err.message : String(err),
          });
        }
      }
    }
    
    // 仅在成功且用户确认时删除原任务
    if (options.removeOriginal && errors.length === 0) {
      await this.sessionManager.clearSessionTodo();
    }
    
    return {
      success: errors.length === 0,
      migrated: migrated.length,
      skipped: 0,
      errors,
    };
  }
  
  /**
   * 验证迁移结果
   */
  async verify(): Promise<MigrationVerification> {
    // 对比源数据和目标数据的完整性
  }
}

16.7 迁移验证

// 迁移验证接口
export interface MigrationValidator {
  validateSource: (sessionTasks: TodoPhase[]) => MigrationValidationResult;
  validateTarget: (taskGraph: TaskGraph) => MigrationValidationResult;
  compare: (source: TodoPhase[], target: Task[]) => MigrationComparison;
}

// 验证结果
export interface MigrationValidationResult {
  valid: boolean;
  errors: string[];
  warnings: string[];
  stats: {
    totalTasks: number;
    migratedTasks: number;
    skippedTasks: number;
    failedTasks: number;
  };
}

// 迁移预览结果
export interface MigrationPreview {
  tasks: Array<{ content: string; status: TodoStatus; phase: string }>;
  total: number;
  warnings: string[];
}

// 迁移执行结果
export interface MigrationResult {
  success: boolean;
  migrated: number;
  skipped: number;
  errors: MigrationError[];
}

export interface MigrationError {
  content: string;
  error: string;
}

16.8 方案 B 的优势总结

维度 方案 B(分阶段发布) 说明
原子性 ✅ 优秀 迁移功能完全独立,可单独测试和发布
内敛性 ✅ 优秀 核心模块保持纯净,迁移逻辑封装在独立模块
风险控制 ✅ 优秀 迁移失败不影响核心功能启动
新环境开销 ✅ 零开销 默认关闭迁移,新环境无检测开销
用户体验 ✅ 可控 用户可选择迁移时机,支持预览
代码复杂度 ✅ 较低 模块职责清晰,耦合度低

十七、监控指标

17.1 核心指标

指标名称 类型 说明 单位
task.graph.tasks.total Gauge 任务图中任务总数
task.graph.tasks.pending Gauge 待执行任务数
task.graph.tasks.in_progress Gauge 执行中任务数
task.graph.tasks.completed Gauge 已完成任务数
task.graph.tasks.blocked Gauge 被阻塞任务数
task.graph.dependencies.total Gauge 依赖关系总数
task.graph.dependencies.cycles Gauge 循环依赖数

17.2 执行指标

指标名称 类型 说明 单位
task.execution.count Counter 任务执行总次数
task.execution.success Counter 成功执行次数
task.execution.failure Counter 失败执行次数
task.execution.duration Histogram 执行耗时分布 ms
task.execution.concurrency Gauge 当前并发执行数

17.3 持久化指标

指标名称 类型 说明 单位
task.persistence.read.count Counter 文件读取次数
task.persistence.write.count Counter 文件写入次数
task.persistence.lock.wait Histogram 锁等待时间 ms
task.persistence.lock.timeout Counter 锁超时次数
task.persistence.error.count Counter 持久化错误次数

17.4 监控集成示例

import { EventBus } from "@oh-my-pi/pi-utils";
import { taskGraphMetrics } from "./metrics";

// 订阅任务更新事件
EventBus.subscribe("task:updated", (event) => {
  taskGraphMetrics.tasks.total.set(event.totalTasks);
  taskGraphMetrics.tasks.pending.set(event.pendingTasks);
  taskGraphMetrics.tasks.in_progress.set(event.inProgressTasks);
  taskGraphMetrics.tasks.completed.set(event.completedTasks);
});

// 订阅执行事件
EventBus.subscribe("task:execution:completed", (event) => {
  taskGraphMetrics.execution.count.inc();
  if (event.success) {
    taskGraphMetrics.execution.success.inc();
  } else {
    taskGraphMetrics.execution.failure.inc();
  }
  taskGraphMetrics.execution.duration.observe(event.durationMs);
});

// 暴露指标(Prometheus 格式)
export function getMetrics(): string {
  return [
    `# HELP task_graph_tasks_total Total number of tasks`,
    `# TYPE task_graph_tasks_total gauge`,
    `task_graph_tasks_total ${taskGraphMetrics.tasks.total.value}`,
    // ... 其他指标
  ].join("\n");
}

17.5 告警规则

# 任务阻塞告警
groups:
  - name: task-graph-alerts
    rules:
      - alert: TaskGraphHighBlockedRatio
        expr: task_graph_tasks_blocked / task_graph_tasks_total > 0.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "任务图阻塞率超过 50%"
          description: "当前阻塞任务数: {{ $value }}"

      - alert: TaskGraphCycleDetected
        expr: task_graph_dependencies_cycles > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "检测到循环依赖"
          description: "循环依赖数: {{ $value }}"

      - alert: TaskExecutionFailureRate
        expr: task_execution_failure / task_execution_count > 0.1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "任务执行失败率超过 10%"
          description: "失败次数: {{ $value }}"

十八、性能基准

18.1 性能测试配置

测试参数
CPU Intel i7-12700K (12 cores)
内存 32GB DDR4
存储 NVMe SSD
Node.js 20.x
测试任务数 100, 1000, 10000

18.2 性能测试结果

18.2.1 任务加载性能

任务数量 加载时间 内存占用
100 < 10ms ~500KB
1000 < 50ms ~5MB
10000 < 200ms ~50MB

18.2.2 依赖图操作性能

操作 1000 任务 10000 任务
拓扑排序 < 10ms < 50ms
依赖检测 < 5ms < 30ms
自动解锁 < 3ms < 20ms

18.2.3 持久化操作性能

操作 单任务 批量(100任务)
写入 < 5ms < 50ms
读取 < 2ms < 20ms
删除 < 3ms < 30ms

18.2.4 并发性能

并发数 吞吐量 平均延迟
1 100 ops/s 10ms
4 350 ops/s 11ms
8 500 ops/s 16ms
16 600 ops/s 27ms

18.3 性能优化策略

18.3.1 增量加载

// 按需加载任务详情
async function loadTaskSummary(taskId: number): Promise<TaskSummary> {
  // 只加载基本信息,延迟加载详情
  const summary = await this.cache.get(taskId);
  if (summary) return summary;
  
  const fullTask = await this.loadFullTask(taskId);
  const taskSummary: TaskSummary = {
    id: fullTask.id,
    subject: fullTask.subject,
    status: fullTask.status,
    blockedBy: fullTask.blockedBy.length,
  };
  
  await this.cache.set(taskId, taskSummary, { ttl: 60000 });
  return taskSummary;
}

18.3.2 任务归档

// 自动归档已完成任务
async function archiveCompletedTasks(maxAgeDays: number): Promise<number> {
  const cutoff = Date.now() - maxAgeDays * 24 * 60 * 60 * 1000;
  const completedTasks = await this.list({ status: "completed" });
  
  let archivedCount = 0;
  for (const task of completedTasks) {
    if (task.completedAt && task.completedAt < cutoff) {
      await this.archiveTask(task.id);
      await this.delete(task.id);
      archivedCount++;
    }
  }
  
  return archivedCount;
}

18.3.3 缓存机制

// 依赖图计算结果缓存
class DependencyGraphCache {
  #cache = new Map<string, DependencyGraph>();
  #ttl = 30000; // 30秒
  
  get(key: string): DependencyGraph | undefined {
    const entry = this.#cache.get(key);
    if (entry && Date.now() - entry.timestamp < this.#ttl) {
      return entry.graph;
    }
    this.#cache.delete(key);
    return undefined;
  }
  
  set(key: string, graph: DependencyGraph): void {
    this.#cache.set(key, { graph, timestamp: Date.now() });
  }
  
  invalidate(pattern: string): void {
    for (const key of this.#cache.keys()) {
      if (key.startsWith(pattern)) {
        this.#cache.delete(key);
      }
    }
  }
}

18.4 性能监控仪表板

// 性能监控面板
export interface PerformanceDashboard {
  tasks: {
    total: number;
    byStatus: Record<TaskStatus, number>;
  };
  dependencies: {
    total: number;
    cycles: number;
  };
  performance: {
    loadTimeMs: number;
    graphBuildTimeMs: number;
    writeLatencyMs: number;
    readLatencyMs: number;
  };
  concurrency: {
    current: number;
    peak: number;
    limit: number;
  };
}

文档最后更新日期:2026-05-21 补充了 API 兼容性、配置迁移指南、监控指标和性能基准等章节


附录:相关文件路径

oh-my-pi 当前任务系统文件

文件 路径 说明
todo-write 工具 packages/coding-agent/src/tools/todo-write.ts 会话级任务列表管理,支持 Markdown 双向转换
task 工具主文件 packages/coding-agent/src/task/index.ts 子代理并行执行框架,包含计划模式集成和模型覆盖机制
task 工具类型 packages/coding-agent/src/task/types.ts Zod v4 类型定义,包含 AgentProgress、SingleResult 等核心类型
task 工具执行器 packages/coding-agent/src/task/executor.ts 子代理进程内执行引擎,负责事件转发和进度追踪
task 工具渲染器 packages/coding-agent/src/task/render.ts TUI 渲染组件,支持进度、结果和审查发现的可视化
task 工具 prompt packages/coding-agent/src/prompts/tools/task.md 工具描述文档

s07 持久化任务图新增文件(计划)

文件 路径 说明
任务图适配器 packages/coding-agent/src/task/task-graph-adapter.ts 桥接新旧任务系统,支持数据转换和同步
智能任务路由器 packages/coding-agent/src/task/task-router.ts 根据任务复杂度自动选择工具
统一任务工具 packages/coding-agent/src/tools/task.ts 统一的任务创建入口,支持自动路由
任务持久化层 packages/coding-agent/src/task/persistence.ts 负责读写 .tasks/task_{id}.json 文件
依赖管理器 packages/coding-agent/src/task/dependency-manager.ts 管理任务依赖关系和自动解锁逻辑

learn-claude-code 参考文件

文件 路径
s07 实现 agents/s07_task_system.py
s07 文档 (中文) docs/zh/s07-task-system.md
s03 实现 agents/s03_todo_write.py
s04 实现 agents/s04_subagent.py

oh-my-pi 测试文件

文件 路径 说明
todo-write 测试 packages/coding-agent/test/tools/todo-write.test.ts todo-write 工具单元测试
executor 警告测试 packages/coding-agent/test/task/executor-warnings.test.ts 执行器警告处理测试
executor 提醒测试 packages/coding-agent/test/task/executor-subagent-reminders.test.ts 子代理提醒功能测试
worktree 测试 packages/coding-agent/test/task/worktree.test.ts worktree 隔离模式测试
渲染测试 packages/coding-agent/test/task/render-report-finding.test.ts 报告发现渲染测试
Windows ARM projfs 测试 packages/coding-agent/test/task/issue-949-windows-arm-projfs.test.ts Windows ARM projfs 兼容性测试

文档生成日期:2026-05-20