oh-my-pi 任务系统 s07 分析与迭代开发计划
本文档分析了 oh-my-pi 当前任务系统与 learn-claude-code s07 文档(learn-claude-code\docs\zh\s07-task-system.md)设计的差异,并提出迭代开发计划以支持持久化任务图功能。任务系统的实现可以参考“claude_code\src"下的代码实现。
一、oh-my-pi 任务系统现状分析
1.1 当前实现定位
根据对照分析,oh-my-pi 当前实现的是:
| 功能 | learn-claude-code | oh-my-pi | 实现位置 |
|---|---|---|---|
| s03: 会话级任务列表 | s03_todo_write.py |
✅ todo-write.ts |
packages/coding-agent/src/tools/todo-write.ts |
| s04: 子代理并行执行 | s04_subagent.py |
✅ task 工具 |
packages/coding-agent/src/task/index.ts |
| s07: 持久化任务图 | s07_task_system.py |
❌ 未实现 | - |
结论:oh-my-pi 缺乏跨会话的持久化任务图支持(s07)。
1.2 当前双系统架构
oh-my-pi 将任务管理拆分为两个独立工具:
1.2.1 todo-write 工具(会话级)
特性:
- 存储位置:Session 会话中(通过
USER_TODO_EDIT_CUSTOM_TYPE自定义类型) - 数据结构:分阶段(Phase)的任务列表
- 状态机:
pending → in_progress → completed → abandoned(四态) - 操作类型:
init、start、done、rm、drop、append、note - 持久化:依赖会话持久化,非独立磁盘文件
- Markdown 双向转换:支持将任务列表导出为 Markdown 格式,并从 Markdown 导入(代码第 320-438 行,实际行号需确认)
phasesToMarkdown():将任务列表渲染为 Markdown checklistmarkdownToPhases():解析 Markdown checklist 回任务列表- 支持状态标记:
[ ](pending)、[x](completed)、[/](in_progress)、[-](abandoned) - 支持任务备注的 blockquote 格式(
> text)
- 核心用途:会话内的任务列表管理和进度追踪
关键代码:
// packages/coding-agent/src/tools/todo-write.ts
export type TodoStatus = "pending" | "in_progress" | "completed" | "abandoned";
export interface TodoItem {
content: string;
status: TodoStatus;
notes?: string[]; // 备注
}
export interface TodoPhase {
name: string;
tasks: TodoItem[];
}
// Markdown 双向转换
export function phasesToMarkdown(phases: TodoPhase[]): string;
export function markdownToPhases(md: string): { phases: TodoPhase[]; errors: string[] };
1.2.2 task 工具(子代理执行)
特性:
- 核心功能:向子代理分发并行任务
- 代理发现:从
~/.omp/agent/agents/*.md和.omp/agents/*.md加载 - 执行模式:同步执行 + 异步后台执行(
async.enabled) - 隔离机制:worktree/fuse-overlay/fuse-projfs 隔离执行环境
- worktree 模式:创建 Git worktree 进行隔离(
worktree.ts) - fuse-overlay 模式:使用 FUSE overlayfs 实现文件系统隔离(
worktree.ts) - fuse-projfs 模式:使用 FUSE projfs 实现文件系统隔离(
worktree.ts) - 支持自动清理隔离环境(
task/index.ts第 1028-1029 行) - 支持两种结果合并模式:
patch(补丁合并,第 991-1000 行)和branch(分支合并,第 958-989 行)
- worktree 模式:创建 Git worktree 进行隔离(
- 并行控制:支持并发数限制(
task.maxConcurrency) - 进度追踪:通过
AgentProgress实时追踪每个子任务状态 - 计划模式集成:当父会话启用计划模式时,自动注入计划模式提示词并限制可用工具(代码第 614-623 行)
- 注入
plan-mode-subagent.md提示词 - 限制子代理只能使用只读工具:
read、search、find、lsp、web_search - 禁用子代理的 spawns 能力
- 注入
- 模型覆盖机制:支持多层级模型覆盖(代码第 626-635 行)
- 优先级:设置覆盖 > agent 定义 > 父会话模型
- 支持通过
task.agentModelOverrides配置 per-agent 模型 - 支持环境变量
PI_BLOCKED_AGENT防止递归调用(task/index.ts)
关键类型:
// packages/coding-agent/src/task/types.ts
export interface AgentProgress {
index: number;
id: string;
agent: string;
status: "pending" | "running" | "completed" | "failed" | "aborted";
recentTools: Array<{ tool: string; args: string; endMs: number }>;
tokens: number;
durationMs: number;
}
二、s07 文档设计思路
2.1 核心设计目标
根据 learn-claude-code/docs/zh/s07-task-system.md,任务系统的核心设计:
| 设计要点 | 文档描述 |
|---|---|
| 持久化 | 任务存储为磁盘上的 JSON 文件(.tasks/task_{id}.json) |
| 依赖图 | 支持 blockedBy 字段定义任务依赖关系 |
| 状态机 | pending → in_progress → completed 三态转换 |
| 自动解锁 | 完成任务时自动从依赖任务的 blockedBy 中移除 |
| 工具集 | task_create、task_update、task_list、task_get |
| 核心价值 | "状态存活于压缩之外"——任务图比任何对话都持久 |
2.2 任务图示例
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}
任务图 (DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+
顺序: task 1 必须先完成,才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed
三、异同对比
3.1 相同之处
| 维度 | 文档 | oh-my-pi |
|---|---|---|
| 状态追踪 | pending → in_progress → completed |
pending → running → completed(todo-write 还有 abandoned) |
| 工具化 | 提供 task_create/update/list/get | 提供 todo_write 和 task 工具 |
| 并行支持 | 任务图支持并行任务 | task 工具支持并行子代理执行 |
| 进度可视化 | 列表显示状态和依赖 | TUI 渲染任务状态和进度 |
3.2 关键差异
| 维度 | 文档思路 | oh-my-pi 实现 |
|---|---|---|
| 持久化方式 | 独立 JSON 文件(.tasks/task_{id}.json) |
会话内存储(todo-write)或临时 artifacts |
| 依赖管理 | 显式 blockedBy 数组定义依赖边 |
未实现显式依赖关系 |
| 依赖解锁 | 完成任务时自动清除下游依赖 | 未实现自动解锁机制 |
| 核心用途 | 管理跨会话的长期目标 | 管理会话内的任务列表 + 子代理分发 |
| 任务粒度 | 任意任务(可跨越多个会话) | 子代理调用(单次会话内完成) |
| 隔离执行 | 未提及 | 支持 worktree/fuse 隔离环境 |
| 异步执行 | 未提及 | 支持后台异步任务(async.enabled) |
3.3 架构差异对比
文档设计的任务图架构:
┌─────────────────────────────────────────────────────────────┐
│ .tasks/ 目录 │
├─────────────┬─────────────┬─────────────┬─────────────────┤
│ task_1.json │ task_2.json │ task_3.json │ task_4.json ... │
│ {"id":1, │ {"id":2, │ {"id":3, │ {"id":4, │
│ "status": │ "status": │ "status": │ "status": │
│ "completed"│ "pending", │ "pending", │ "pending", │
│ ...} │ "blockedBy"│ "blockedBy"│ "blockedBy" │
│ │ :[1]} │ :[1]} │ :[2,3]} │
└─────────────┴─────────────┴─────────────┴─────────────────┘
│ │ │
└───────────────┼───────────────┘
▼
依赖图自动解锁
(完成时触发)
oh-my-pi 的双系统架构:
┌─────────────────────────────────────────────────────────────────┐
│ todo-write 工具(会话级任务列表) │
├─────────────────────────────────────────────────────────────────┤
│ • Phase 分组管理 │
│ • 四态状态机:pending → in_progress → completed → abandoned │
│ • Markdown 双向转换 │
│ ├─ phasesToMarkdown(): 导出为 Markdown checklist │
│ └─ markdownToPhases(): 从 Markdown 导入任务列表 │
│ • Session 存储(USER_TODO_EDIT_CUSTOM_TYPE) │
│ • 任务备注支持(notes 字段) │
└─────────────────────────────────────────────────────────────────┘
│
▼
会话生命周期内跟踪开发进度
┌─────────────────────────────────────────────────────────────────┐
│ task 工具(子代理并行执行) │
├─────────────────────────────────────────────────────────────────┤
│ • Agent 发现机制 │
│ ├─ Bundled agents(内置) │
│ ├─ User agents(~/.omp/agent/agents/*.md) │
│ └─ Project agents(.omp/agents/*.md) │
│ • 执行模式 │
│ ├─ 同步执行(blocking) │
│ └─ 异步后台执行(async.enabled) │
│ • 隔离执行环境 │
│ ├─ worktree 模式:Git worktree 隔离 │
│ ├─ fuse-overlay 模式:FUSE overlayfs 隔离 │
│ └─ fuse-projfs 模式:FUSE projfs 隔离 │
│ • 结果合并模式 │
│ ├─ patch 模式:生成并应用补丁 │
│ └─ branch 模式:创建并合并分支 │
│ • 高级特性 │
│ ├─ 计划模式集成(只读工具限制) │
│ ├─ 模型覆盖机制(多层级优先级) │
│ ├─ 并发控制(task.maxConcurrency) │
│ └─ 进度追踪(AgentProgress 实时更新) │
└─────────────────────────────────────────────────────────────────┘
│
▼
单次会话内分发并行子任务
(支持隔离执行和结果合并)
四、设计决策分析
4.1 为什么 oh-my-pi 没有实现持久化任务图?
-
关注点分离:将任务管理拆分为两个独立功能
todo-write:专注于会话内的任务列表管理task:专注于子代理的并行执行调度
-
架构取舍:
- 文档的设计是任务持久化优先,适合长期项目追踪
- oh-my-pi 的设计是会话隔离优先,任务与会话生命周期绑定
-
实际使用场景:
- 文档的任务图适合"跨会话的大目标拆分"
- oh-my-pi 的设计更适合"单会话内的任务分发"
4.2 oh-my-pi 的创新点
| 特性 | 说明 |
|---|---|
| 隔离执行 | 通过 worktree/fuse 实现任务间的文件系统隔离 |
| 异步后台 | 支持任务后台执行,不阻塞主会话 |
| Agent 发现 | 支持用户/项目级别的自定义代理定义 |
| 进度追踪 | 实时追踪每个子任务的工具调用、token 消耗、耗时 |
| 结果合并 | 支持 patch/branch 两种模式合并隔离任务结果 |
五、总结
| 对比维度 | 文档(s07_task_system.py) | oh-my-pi |
|---|---|---|
| 持久化 | 磁盘 JSON 文件,跨会话存活 | 会话内存储,会话结束失效 |
| 依赖管理 | 显式 blockedBy + 自动解锁 |
未实现依赖关系 |
| 核心价值 | 长期任务追踪 | 子代理并行执行 |
| 状态数 | 3 态 | todo-write: 4 态;task: 5 态 |
| 隔离能力 | 无 | 支持 worktree/fuse 隔离 |
| 异步支持 | 无 | 支持后台异步执行 |
核心结论:oh-my-pi 没有直接实现文档中的"持久化任务图"概念,而是将任务管理拆分为两个更专注的工具:
todo-write:会话内的结构化任务列表(类似 s03 的 TodoManager 升级版)task:子代理并行执行框架(全新功能,文档未提及)
这种设计更符合实际使用场景——短期会话内的任务追踪和并行执行,而非跨会话的长期目标管理。
六、支持 s07 持久化任务图的迭代开发计划
迭代依赖关系图
迭代 1 (持久化层)
│
▼
迭代 2 (依赖管理) ───┐
│ │
▼ │
迭代 3 (TaskManager) ◄┘
│
├──────────────┬──────────────┐
▼ ▼ ▼
迭代 4 (工具) 迭代 5 (TUI) 迭代 6 (执行集成)
│ │ │
└──────────────┴──────────────┘
│
▼
迭代 7 (会话集成)
推荐实施顺序
Phase 1 - 基础层(迭代 1-3)
- 先实现核心数据结构和业务逻辑
- 不急于暴露给用户,确保稳定性
Phase 2 - 工具层(迭代 4-5)
- 提供用户可交互的工具命令
- 可视化增强用户体验
Phase 3 - 集成层(迭代 6-7)
- 与现有功能深度集成
- 实现完整的跨会话工作流
迭代 1:持久化存储层(Task Persistence Layer)
目标:实现任务文件的 CRUD 操作,不改变现有工具行为
范围:
- 新增
packages/coding-agent/src/task/persistence.ts - 实现
TaskPersistence类,负责读写.tasks/task_{id}.json - 支持任务 ID 生成、加载、保存
核心接口:
// 参考 packages/coding-agent/src/task/types.ts 的类型定义风格
import * as z from "zod/v4";
/** 任务状态枚举(参考 swarm-extension 扩展) */
export const TaskStatus = z.enum([
"pending", // 待执行
"waiting", // 等待依赖(新增,参考 swarm-extension/state.ts)
"in_progress", // 执行中
"completed", // 已完成
"failed", // 失败(新增,参考 swarm-extension/state.ts)
"aborted", // 已中止(新增,参考 swarm-extension/state.ts)
"abandoned", // 已放弃(从 todo-write 继承)
]).describe("Task status");
export type TaskStatus = z.infer<typeof TaskStatus>;
/** 任务数据结构(参考 swarm-extension 双向依赖设计) */
export const TaskSchema = z.object({
id: z.number().describe("Unique task identifier"),
subject: z.string().describe("Task title/summary"),
description: z.optional(z.string().describe("Detailed task description")),
status: TaskStatus,
blockedBy: z.array(z.number()).describe("List of task IDs this task depends on"),
blocks: z.array(z.number()).describe("List of task IDs blocked by this task (双向依赖,参考 swarm-extension/dag.ts)"),
reportsTo: z.array(z.number()).describe("List of task IDs this task reports to (隐式依赖,参考 swarm-extension/dag.ts)"),
createdAt: z.number().describe("Unix timestamp in milliseconds"),
updatedAt: z.number().describe("Unix timestamp in milliseconds"),
});
export type Task = z.infer<typeof TaskSchema>;
/** 任务过滤器 */
export const TaskFilterSchema = z.object({
status: z.optional(TaskStatus),
blocked: z.optional(z.boolean().describe("Filter by blocked status")),
});
export type TaskFilter = z.infer<typeof TaskFilterSchema>;
/** 持久化错误类型 */
export const TaskPersistenceErrorSchema = z.object({
code: z.enum([
"ENOENT", // 任务不存在
"EEXIST", // 任务已存在
"ELOCKED", // 文件被锁定
"EIO", // I/O 错误
"ENOSPC", // 磁盘空间不足
"EINVALID", // 无效数据格式
"ECONCURRENT", // 并发写入冲突
]).describe("错误码"),
message: z.string().describe("错误描述"),
taskId: z.optional(z.number()).describe("相关任务ID"),
cause: z.optional(z.string()).describe("根本原因"),
});
export type TaskPersistenceError = z.infer<typeof TaskPersistenceErrorSchema>;
/** 持久化操作结果 */
export type PersistenceResult<T> =
| { success: true; data: T }
| { success: false; error: TaskPersistenceError };
/** 持久化接口 */
export interface TaskPersistence {
/**
* 创建新任务
* @param subject 任务标题
* @param description 任务描述(可选)
* @param blockedBy 依赖的任务ID列表(可选)
* @returns 创建的任务或错误信息
*/
create(subject: string, description?: string, blockedBy?: number[]): PersistenceResult<Task>;
/**
* 加载任务
* @param taskId 任务ID
* @returns 任务对象或错误信息,任务不存在时返回 null
*/
load(taskId: number): PersistenceResult<Task | null>;
/**
* 保存任务(带文件锁保护)
* @param task 任务对象
* @param timeoutMs 锁等待超时时间(默认 5000ms)
* @returns 保存结果或错误信息
*/
save(task: Task, timeoutMs?: number): PersistenceResult<void>;
/**
* 列出任务
* @param filter 过滤条件(可选)
* @returns 任务列表或错误信息
*/
list(filter?: TaskFilter): PersistenceResult<Task[]>;
/**
* 删除任务
* @param taskId 任务ID
* @returns 删除结果或错误信息
*/
delete(taskId: number): PersistenceResult<void>;
/**
* 获取下一个可用的任务ID
* @returns 下一个任务ID或错误信息
*/
getNextId(): PersistenceResult<number>;
/**
* 尝试获取文件锁
* @param taskId 任务ID
* @param timeoutMs 超时时间
* @returns 锁句柄或错误信息
*/
tryLock(taskId: number, timeoutMs?: number): PersistenceResult<number>;
/**
* 释放文件锁
* @param lockHandle 锁句柄
*/
unlock(lockHandle: number): void;
}
设计说明:
- 使用 Zod v4 进行类型定义,与现有代码风格保持一致(参考
task/types.ts和tools/todo-write.ts) - 使用
z.enum()定义枚举类型 - 使用
z.object()定义复杂对象 - 使用
.describe()添加字段描述,支持 OpenAPI 文档生成 - 错误处理:定义统一的
TaskPersistenceError错误类型,包含错误码和详细描述 - 并发控制:使用
fs.promises.lock()实现文件级锁,支持超时机制 - 原子写入:使用
writeFile的flag: 'wx'确保原子性,配合文件锁防止并发写入 - 接口契约:每个方法都有明确的 JSDoc 注释,定义输入参数和返回值语义
文件锁实现细节:
// packages/coding-agent/src/task/persistence.ts - 文件锁实现
async function acquireLock(taskId: number, timeoutMs: number): Promise<number> {
const lockPath = `${TASKS_DIR}/task_${taskId}.lock`;
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
try {
// 使用 wx 标志确保只创建新文件(原子操作)
const fd = await fs.promises.open(lockPath, 'wx');
await fd.close();
return taskId; // 返回锁句柄
} catch (err) {
if (err.code === 'EEXIST') {
// 锁已存在,等待后重试
await new Promise(resolve => setTimeout(resolve, 50));
continue;
}
throw err;
}
}
throw new Error(`Timeout waiting for lock on task ${taskId}`);
}
async function releaseLock(taskId: number): Promise<void> {
const lockPath = `${TASKS_DIR}/task_${taskId}.lock`;
await fs.promises.unlink(lockPath).catch(() => {});
}
异常场景处理:
| 异常场景 | 检测方式 | 处理策略 |
|---|---|---|
| 磁盘空间不足 | ENOSPC 错误码 |
返回 TaskPersistenceError,提示用户清理空间 |
| 文件损坏 | JSON 解析失败 | 返回 EINVALID 错误,保留损坏文件供后续分析 |
| 网络分区 | EIO 错误码 |
重试 3 次后返回错误,记录日志 |
| 并发写入冲突 | 锁获取失败 | 返回 ECONCURRENT 错误,建议重试 |
验收标准:
- 单元测试覆盖 CRUD 操作
- 任务文件正确序列化到
.tasks/目录 - 重启后能正确加载已有任务
- 任务 ID 自增逻辑正确
- 类型定义通过 TypeScript 编译检查
- 并发写入冲突测试(多进程同时写入同一任务)
- 文件锁超时测试
- 磁盘空间不足场景测试
- 网络分区场景测试(模拟 I/O 错误)
内聚性:仅负责文件 I/O,不涉及业务逻辑
迭代 2:依赖图管理(Dependency Graph Manager)
目标:实现任务依赖关系的管理和自动解锁
范围:
- 新增
packages/coding-agent/src/task/dependency-manager.ts - 实现依赖添加、移除、验证
- 实现完成任务时的自动解锁逻辑
核心接口:
// 参考 packages/coding-agent/src/task/types.ts 的类型定义风格
import * as z from "zod/v4";
/** 依赖图节点 */
export const DependencyNodeSchema = z.object({
taskId: z.number().describe("Task ID"),
dependsOn: z.array(z.number()).describe("Task IDs this task depends on"),
blocks: z.array(z.number()).describe("Task IDs blocked by this task"),
});
export type DependencyNode = z.infer<typeof DependencyNodeSchema>;
/** 执行模式(参考 swarm-extension/schema.ts) */
export const ExecutionModeSchema = z.enum([
"default", // 默认模式,按依赖顺序执行
"pipeline", // 管道模式,支持多次迭代
"parallel", // 并行模式,无依赖任务同时执行
"sequential", // 顺序模式,按声明顺序执行
]).describe("Task execution mode");
export type ExecutionMode = z.infer<typeof ExecutionModeSchema>;
/** 执行波次(参考 swarm-extension/dag.ts buildExecutionWaves) */
export const ExecutionWaveSchema = z.object({
waveIndex: z.number().describe("波次序号"),
taskIds: z.array(z.number()).describe("本波次可并行执行的任务ID列表"),
});
export type ExecutionWave = z.infer<typeof ExecutionWaveSchema>;
/** 依赖管理错误类型 */
export const DependencyErrorSchema = z.object({
code: z.enum([
"E_CYCLE", // 循环依赖
"E_NOT_FOUND", // 任务不存在
"E_INVALID_DEP", // 无效的依赖关系(自依赖等)
"E_LOCKED", // 任务被锁定
]).describe("错误码"),
message: z.string().describe("错误描述"),
taskId: z.optional(z.number()).describe("相关任务ID"),
relatedTaskId: z.optional(z.number()).describe("相关依赖任务ID"),
cycle: z.optional(z.array(z.number())).describe("循环路径(仅循环依赖错误)"),
});
export type DependencyError = z.infer<typeof DependencyErrorSchema>;
/** 依赖管理操作结果 */
export type DependencyResult<T> =
| { success: true; data: T }
| { success: false; error: DependencyError };
/** 循环依赖错误 */
export const CycleDetectedErrorSchema = z.object({
cycle: z.array(z.number()).describe("Task IDs forming a cycle"),
message: z.string().describe("错误描述"),
});
export type CycleDetectedError = z.infer<typeof CycleDetectedErrorSchema>;
/** 依赖管理器接口 */
export interface DependencyManager {
/**
* 添加依赖关系
* @param taskId 依赖方任务ID
* @param dependsOnTaskId 被依赖方任务ID
* @returns 添加结果或错误信息
* @throws E_CYCLE 如果添加后形成循环依赖
* @throws E_NOT_FOUND 如果任一任务不存在
* @throws E_INVALID_DEP 如果尝试添加自依赖
*/
addDependency(taskId: number, dependsOnTaskId: number): DependencyResult<void>;
/**
* 移除依赖关系
* @param taskId 依赖方任务ID
* @param dependsOnTaskId 被依赖方任务ID
* @returns 移除结果或错误信息
* @throws E_NOT_FOUND 如果任务不存在或依赖关系不存在
*/
removeDependency(taskId: number, dependsOnTaskId: number): DependencyResult<void>;
/**
* 完成任务并自动解锁下游任务
* @param taskId 任务ID
* @returns 解锁的下游任务列表或错误信息
* @throws E_NOT_FOUND 如果任务不存在
*/
completeTask(taskId: number): DependencyResult<Task[]>;
/**
* 获取被阻塞的任务(blockedBy 非空)
* @returns 被阻塞的任务列表
*/
getBlockedTasks(): Task[];
/**
* 获取可执行的任务(pending 且无依赖)
* @returns 可执行的任务列表
*/
getAvailableTasks(): Task[];
/**
* 验证依赖图是否为 DAG(无环)
* @returns 验证结果,包含是否有效和循环路径(如果存在)
*/
validateDAG(): { valid: boolean; cycle?: number[] };
/**
* 获取依赖图拓扑排序
* @returns 拓扑排序后的任务ID数组,如果存在循环则返回 null
*/
getTopologicalOrder(): number[] | null;
/**
* 生成执行波次(参考 swarm-extension/dag.ts buildExecutionWaves)
* @returns 执行波次数组,同一波次内的任务可并行执行
*/
getExecutionWaves(): ExecutionWave[];
/**
* 获取当前可执行的任务(pending/waiting 且所有依赖已完成)
* @param completedTaskIds 已完成的任务ID集合
* @returns 当前可执行的任务列表
*/
getReadyTasks(completedTaskIds: Set<number>): Task[];
/**
* 设置执行模式
* @param mode 执行模式
*/
setExecutionMode(mode: ExecutionMode): void;
/**
* 获取任务的所有依赖(包括间接依赖)
* @param taskId 任务ID
* @returns 依赖的任务ID列表(去重)
* @throws E_NOT_FOUND 如果任务不存在
*/
getAllDependencies(taskId: number): DependencyResult<number[]>;
/**
* 获取任务的所有下游任务(包括间接下游)
* @param taskId 任务ID
* @returns 下游任务ID列表(去重)
* @throws E_NOT_FOUND 如果任务不存在
*/
getAllDownstream(taskId: number): DependencyResult<number[]>;
/**
* 获取任务的直接依赖数
* @param taskId 任务ID
* @returns 直接依赖数量
*/
getDirectDependencyCount(taskId: number): number;
/**
* 检查任务是否被指定任务阻塞
* @param taskId 任务ID
* @param blockerId 阻塞者任务ID
* @returns 是否被阻塞
*/
isBlockedBy(taskId: number, blockerId: number): boolean;
/**
* 批量完成多个任务
* @param taskIds 任务ID列表
* @param cascade 是否级联解锁下游任务(默认 true)
* @returns 完成的任务列表和解锁的任务列表
*/
completeTasks(taskIds: number[], cascade?: boolean): DependencyResult<{
completed: Task[];
unlocked: Task[];
}>;
}
设计说明:
- 使用 Zod v4 定义数据结构,与现有代码风格一致
- 错误处理:定义统一的
DependencyError错误类型,包含错误码和详细描述 validateDAG()返回结构化错误信息而非抛出异常- 添加
getTopologicalOrder()支持拓扑排序(参考现有并行执行逻辑) - 添加
getExecutionWaves()支持执行波次生成(参考 swarm-extension/dag.ts) - 添加
getAllDependencies()和getAllDownstream()支持深度依赖查询 - 添加
getDirectDependencyCount()、isBlockedBy()、completeTasks()等辅助方法 - 接口方法设计参考
task/index.ts的错误处理模式 - 每个方法都有明确的 JSDoc 注释,定义输入参数、返回值和可能的错误情况
执行波次实现(参考 swarm-extension/dag.ts):
// packages/coding-agent/src/task/dependency-manager.ts - 执行波次生成
function buildExecutionWaves(deps: Map<number, Set<number>>): ExecutionWave[] {
const waves: ExecutionWave[] = [];
const completed = new Set<number>();
const remaining = new Set(deps.keys());
while (remaining.size > 0) {
const wave: number[] = [];
// 找出所有依赖已满足的节点
for (const node of remaining) {
const nodeDeps = deps.get(node)!;
let ready = true;
for (const dep of nodeDeps) {
if (!completed.has(dep)) {
ready = false;
break;
}
}
if (ready) {
wave.push(node);
}
}
if (wave.length === 0) {
// 无法继续,存在循环依赖或孤立节点
break;
}
waves.push({ waveIndex: waves.length, taskIds: wave });
for (const node of wave) {
remaining.delete(node);
completed.add(node);
}
}
return waves;
}
任务恢复机制:
// packages/coding-agent/src/task/dependency-manager.ts - 任务恢复
interface RecoveryState {
taskId: number;
originalStatus: TaskStatus;
blockedByBefore: number[];
timestamp: number;
}
class DependencyManagerImpl implements DependencyManager {
private recoveryLog: RecoveryState[] = [];
/**
* 记录任务状态变更前的状态(用于恢复)
*/
private logRecoveryState(task: Task, blockedByBefore: number[]): void {
this.recoveryLog.push({
taskId: task.id,
originalStatus: task.status,
blockedByBefore,
timestamp: Date.now(),
});
// 保留最近 100 条恢复记录
if (this.recoveryLog.length > 100) {
this.recoveryLog.shift();
}
}
/**
* 撤销最近的操作
* @param steps 撤销步数(默认 1)
* @returns 撤销的任务数量
*/
undo(steps: number = 1): number {
let undoneCount = 0;
for (let i = 0; i < steps && this.recoveryLog.length > 0; i++) {
const state = this.recoveryLog.pop()!;
const task = this.taskPersistence.load(state.taskId);
if (task.success && task.data) {
task.data.status = state.originalStatus;
task.data.blockedBy = [...state.blockedByBefore];
this.taskPersistence.save(task.data);
undoneCount++;
}
}
return undoneCount;
}
/**
* 清理过期的恢复日志
* @param maxAgeMs 最大保留时间(默认 1 小时)
*/
cleanupRecoveryLog(maxAgeMs: number = 3600000): void {
const cutoff = Date.now() - maxAgeMs;
this.recoveryLog = this.recoveryLog.filter(r => r.timestamp > cutoff);
}
}
验收标准:
- 单元测试验证依赖添加/移除
- 完成任务后,下游任务的
blockedBy自动移除该任务 - 检测循环依赖并返回循环路径
- 正确识别可执行任务
- 拓扑排序结果正确
- 循环依赖检测测试(多种循环模式)
- 任务恢复机制测试(undo 操作)
- 批量完成任务测试
- 自依赖检测测试
内聚性:仅处理依赖逻辑,不涉及文件 I/O
迭代 3:TaskManager 整合层(低侵入)
目标:整合持久化和依赖管理,提供统一的任务管理接口
范围:
- 新增
packages/coding-agent/src/task/task-manager.ts(完全独立模块) - 组合
TaskPersistence和DependencyManager - 不修改现有
task/index.ts
核心接口:
export class TaskManager {
private persistence: TaskPersistence;
private dependencyManager: DependencyManager;
// CRUD(委托模式,不直接实现)
create(subject: string, description?: string, blockedBy?: number[]): Task;
update(taskId: number, updates: Partial<Task>): Task;
complete(taskId: number): void;
get(taskId: number): Task;
list(filter?: TaskFilter): Task[];
// 依赖查询
getAvailableTasks(): Task[];
getBlockedTasks(): Task[];
getDependencyGraph(): DependencyGraph;
// 单例访问
static getInstance(): TaskManager;
}
设计原则:
- ✅ 零侵入:不修改现有
task/index.ts - ✅ 单例模式:全局唯一实例,便于集成
- ✅ 委托模式:业务逻辑委托给底层模块
- ✅ 依赖注入:支持测试时注入 mock 实现
验收标准:
- 单元测试覆盖所有接口
- 与现有
todo-write工具共存,不冲突 - 支持任务状态转换验证(不能从 completed 回到 pending)
- 集成测试验证完整工作流
内聚性:业务逻辑层,协调持久化和依赖管理
迭代 4:新增 task 工具命令(低侵入)
目标:扩展 task 工具,支持 s07 的任务管理命令
范围:
- 新增
packages/coding-agent/src/tools/task-graph.ts(完全独立工具) - 不修改现有
task/types.ts和task/index.ts - 通过适配器模式桥接新旧工具
新增工具命令:
// task_graph_create
{ name: "task_graph_create", params: { subject: string, description?: string, blockedBy?: number[] } }
// task_graph_update
{ name: "task_graph_update", params: { taskId: number, status?: TaskStatus, addBlockedBy?: number[], removeBlockedBy?: number[] } }
// task_graph_list
{ name: "task_graph_list", params: { filter?: "all" | "pending" | "blocked" | "completed" } }
// task_graph_get
{ name: "task_graph_get", params: { taskId: number } }
// task_graph_complete
{ name: "task_graph_complete", params: { taskId: number } }
设计原则:
- ✅ 零侵入:不修改现有
task/types.ts和task/index.ts - ✅ 独立工具:新功能作为独立工具实现
- ✅ 命令隔离:使用
task_graph_前缀区分新旧命令 - ✅ 适配器桥接:通过
TaskGraphAdapter连接任务图和会话
验收标准:
- 每个工具命令有独立的单元测试
- 工具描述文档更新(
prompts/tools/task-graph.md) - TUI 渲染器支持新工具的展示
- 与现有
task工具共存,命令不冲突
内聚性:工具层,仅负责参数验证和调用 TaskManager
迭代 5:TUI 渲染器增强
目标:在终端界面中可视化任务图和依赖关系
范围:
- 修改
packages/coding-agent/src/task/render.ts - 新增任务板(Task Board)视图
- 支持依赖关系可视化
渲染模式:
任务板视图:
┌─────────────────────────────────────────┐
│ 待办 (Pending) │
├─────────────────────────────────────────┤
│ [ ] #1: Setup project │
│ [ ] #2: Write code (blocked by: 1) │
│ [ ] #3: Write tests (blocked by: 1) │
│ [ ] #4: Deploy (blocked by: 2, 3) │
└─────────────────────────────────────────┘
依赖图视图:
┌──────────┐
│ Task 1 │
│ [✓] │
└────┬─────┘
│
┌────┴─────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Task 2 │ │ Task 3 │
│ [>] │ │ [ ] │
└────┬─────┘ └────┬─────┘
│ │
└────────┘
│
▼
┌──────────┐
│ Task 4 │
│ [ ] │
└──────────┘
验收标准:
- 支持折叠/展开依赖关系
- 不同状态用不同颜色标识
- 支持显示任务阻塞原因
内聚性:仅负责 UI 渲染,不涉及业务逻辑
迭代 6:与现有 task 工具集成(低侵入)
目标:将持久化任务图与现有子代理执行功能结合
范围:
- 新增
packages/coding-agent/src/task/task-graph-executor.ts(独立执行器) - 不修改现有
task/executor.ts - 通过可选参数和回调实现集成
集成点:
// 新增独立执行器(不修改现有 executor.ts)
export class TaskGraphExecutor {
static async executeAvailableTasks(agent: string): Promise<void> {
const taskManager = TaskManager.getInstance();
const availableTasks = taskManager.getAvailableTasks();
for (const task of availableTasks) {
taskManager.update(task.id, { status: "in_progress" });
// 使用现有 runSubprocess,通过回调追踪状态
const result = await runSubprocess({
agent,
task: task.subject,
onStatusChange: (status) => {
if (status === "completed") {
taskManager.complete(task.id); // 自动解锁下游
}
}
});
if (result.exitCode !== 0) {
taskManager.update(task.id, { status: "failed" });
}
}
}
}
设计原则:
- ✅ 零侵入:不修改现有
executor.ts - ✅ 独立模块:新执行逻辑封装在独立类中
- ✅ 回调机制:通过
onStatusChange回调追踪任务状态 - ✅ 组合模式:复用现有
runSubprocess而不修改它
验收标准:
- 集成测试验证完整执行流程
- 任务执行失败时不标记为完成
- 支持批量执行多个可用任务
- 不影响现有
runSubprocess功能
内聚性:执行协调层,连接任务图和子代理执行
迭代 7:会话持久化集成(低侵入)
目标:任务图在会话重启后仍然可用
范围:
- 复用现有
session-manager.ts的CustomEntry机制 - 不修改
session-manager.ts核心逻辑 - 通过适配器模式实现任务图持久化
集成点:
// 新增独立持久化适配器(不修改 session-manager.ts)
export class TaskGraphSessionAdapter {
private static CUSTOM_TYPE = "task-graph-v1";
static async saveToSession(sessionManager: SessionManager, graph: TaskGraph): Promise<void> {
// 使用现有 CustomEntry 机制存储任务图
await sessionManager.appendCustomEntry(TaskGraphSessionAdapter.CUSTOM_TYPE, {
graph,
timestamp: Date.now(),
});
}
static loadFromSession(sessionManager: SessionManager): TaskGraph | null {
// 从 CustomEntry 加载任务图
const entries = sessionManager.getEntries();
const graphEntry = entries
.filter(e => e.type === "custom" && e.customType === TaskGraphSessionAdapter.CUSTOM_TYPE)
.pop();
return graphEntry?.data?.graph ?? null;
}
// 支持 task:// URI 读取任务(通过 URL 处理器扩展)
static handleTaskUri(url: URL): Task | null {
const taskId = parseInt(url.pathname.slice(1));
const taskManager = TaskManager.getInstance();
return taskManager.get(taskId);
}
}
设计原则:
- ✅ 零侵入:不修改现有
session-manager.ts - ✅ 复用机制:利用
CustomEntry存储任务图数据 - ✅ 适配器模式:通过独立适配器连接会话和任务图
- ✅ 解耦设计:任务图生命周期与会话解耦
验收标准:
- 会话重启后任务图完整恢复
- 支持
read task://1读取任务详情 - 任务状态与会话状态解耦
- 不影响现有会话持久化机制
内聚性:会话适配层,负责任务图与会话的桥接
六、低侵入性重构方案总结
6.1 总体架构设计
采用适配器模式 + 独立模块的策略,将任务图功能隔离在独立模块中:
┌─────────────────────────────────────────────────────────────┐
│ 新增独立模块 │
│ task/task-graph/ │
│ ├── types.ts # 任务图类型定义(独立于现有types) │
│ ├── persistence.ts # 任务图持久化逻辑 │
│ ├── dependency.ts # 依赖管理与DAG处理 │
│ ├── manager.ts # 任务图管理器(核心) │
│ ├── executor.ts # 任务图执行器 │
│ └── session-adapter.ts # 会话适配器 │
└─────────────────────────────────────────────────────────────┘
↓ 适配器层
┌─────────────────────────────────────────────────────────────┐
│ 现有代码(最小修改) │
│ task/types.ts ← 扩展接口(不修改原有定义) │
│ task/index.ts ← 注入适配器(单点接入) │
│ task/executor.ts ← 新增可选参数(不修改核心逻辑) │
│ session-manager.ts ← 复用 CustomEntry(零侵入) │
└─────────────────────────────────────────────────────────────┘
6.2 侵入性对比表
| 文件 | 原有修改方式 | 低侵入方案 | 侵入程度 |
|---|---|---|---|
task/types.ts |
修改/扩展现有接口 | 新增独立类型文件 | 零侵入 |
task/index.ts |
修改核心函数逻辑 | 包装模式 + 条件注入 | 极低 |
task/executor.ts |
修改执行流程 | 可选参数 + 回调 | 低 |
session-manager.ts |
新增 Entry 类型 | 复用 CustomEntry | 零侵入 |
6.3 关键设计原则
| 原则 | 说明 |
|---|---|
| 开闭原则 | 对扩展开放(新增模块),对修改关闭(不改动现有代码) |
| 单一职责 | 任务图逻辑集中在独立模块,现有模块保持原有职责 |
| 依赖倒置 | 任务图模块依赖抽象接口,不依赖具体实现 |
| 渐进式引入 | 通过配置开关控制启用,支持灰度发布 |
6.4 新增文件清单
| 文件路径 | 功能描述 | 状态 |
|---|---|---|
task/task-graph/types.ts |
任务图类型定义 | 新增 |
task/task-graph/persistence.ts |
持久化存储层 | 新增 |
task/task-graph/dependency.ts |
依赖图管理 | 新增 |
task/task-graph/manager.ts |
任务图管理器 | 新增 |
task/task-graph/executor.ts |
任务图执行器 | 新增 |
task/task-graph/session-adapter.ts |
会话适配器 | 新增 |
tools/task-graph.ts |
任务图工具命令 | 新增 |
6.5 优势总结
| 优势 | 说明 |
|---|---|
| 风险隔离 | 新功能失败不影响现有任务执行 |
| 易于回滚 | 只需移除新模块,无需恢复修改 |
| 代码清晰 | 职责边界明确,易于维护 |
| 测试友好 | 新模块可独立测试,不影响原有测试套件 |
| 渐进交付 | 支持灰度发布和 A/B 测试 |
七、风险与缓解
7.1 风险矩阵
| 风险类型 | 风险描述 | 严重程度 | 发生概率 | 缓解措施 |
|---|---|---|---|---|
| 并发冲突 | 多个会话同时写入同一任务文件 | 高 | 中 | 文件锁 + 原子写入 + 乐观锁 |
| 数据损坏 | 磁盘空间不足、网络分区导致写入中断 | 高 | 低 | 原子写入 + 备份机制 |
| 性能瓶颈 | 任务图过大影响加载和查询性能 | 中 | 中 | 增量加载 + 任务归档 |
| 循环依赖 | 依赖图中存在循环导致死锁 | 中 | 低 | DAG 验证 + 早期检测 |
| 数据丢失 | 异常退出导致未保存的修改丢失 | 中 | 低 | 自动保存 + 定期备份 |
7.2 并发控制方案
文件锁实现:
- 使用
fs.promises.open()的wx标志创建锁文件(原子操作) - 锁文件路径:
.tasks/task_{id}.lock - 支持可配置的超时机制(默认 5000ms)
- 自动清理过期锁(超过 30 秒未释放视为失效)
乐观锁机制:
// 乐观锁版本控制
export const TaskWithVersionSchema = TaskSchema.extend({
version: z.number().default(1).describe("版本号,每次更新递增"),
});
// 更新时验证版本
async function updateTask(task: Task): PersistenceResult<void> {
const existing = await this.load(task.id);
if (!existing.success || !existing.data) {
return { success: false, error: { code: "ENOENT", message: "任务不存在" } };
}
if (existing.data.version !== task.version) {
return {
success: false,
error: {
code: "ECONCURRENT",
message: "任务已被其他会话修改",
taskId: task.id
}
};
}
task.version++;
return this.save(task);
}
并发写入测试用例:
// packages/coding-agent/src/task/__tests__/persistence-concurrent.test.ts
describe("并发写入测试", () => {
it("多个进程同时写入同一任务应检测冲突", async () => {
const task = await taskPersistence.create("Test Task");
expect(task.success).toBe(true);
// 模拟两个会话同时修改
const session1Task = { ...task.data!, version: 1, subject: "Session 1" };
const session2Task = { ...task.data!, version: 1, subject: "Session 2" };
const result1 = await taskPersistence.save(session1Task);
expect(result1.success).toBe(true);
const result2 = await taskPersistence.save(session2Task);
expect(result2.success).toBe(false);
expect(result2.error?.code).toBe("ECONCURRENT");
});
it("文件锁应防止同时写入", async () => {
const task = await taskPersistence.create("Locked Task");
const lockHandle = await taskPersistence.tryLock(task.data!.id, 100);
// 另一个尝试获取锁应失败
const anotherLock = await taskPersistence.tryLock(task.data!.id, 100);
expect(anotherLock.success).toBe(false);
expect(anotherLock.error?.code).toBe("ELOCKED");
await taskPersistence.unlock(lockHandle.data!);
});
});
7.3 错误处理策略
异常场景处理矩阵:
| 异常场景 | 检测方式 | 处理策略 | 用户通知 | 恢复机制 |
|---|---|---|---|---|
| 磁盘空间不足 | ENOSPC 错误码 |
立即停止写入,返回错误 | "磁盘空间不足,请清理后重试" | 释放空间后手动重试 |
| 文件损坏 | JSON 解析失败 | 保留损坏文件,返回错误 | "任务文件损坏,正在尝试恢复" | 从备份恢复或手动修复 |
| 网络分区 | EIO 错误码 |
重试 3 次(间隔 1s),失败则返回错误 | "网络异常,请检查连接" | 网络恢复后自动重试 |
| 并发写入冲突 | 版本不匹配 | 返回错误,提示重试 | "任务已被其他会话修改,请刷新后重试" | 用户刷新后重新操作 |
| 锁超时 | 获取锁超时 | 返回错误,提示稍后重试 | "系统繁忙,请稍后重试" | 自动重试或用户手动重试 |
任务恢复机制设计:
// packages/coding-agent/src/task/recovery.ts
export interface RecoveryPoint {
id: string;
timestamp: number;
taskIds: number[];
snapshot: Record<number, Task>;
}
export class TaskRecoveryManager {
private backupDir = `${TASKS_DIR}/backups`;
/**
* 创建恢复点
* @param taskIds 要备份的任务ID列表
* @returns 恢复点ID
*/
async createRecoveryPoint(taskIds: number[]): Promise<string> {
const snapshot: Record<number, Task> = {};
for (const id of taskIds) {
const result = await this.persistence.load(id);
if (result.success && result.data) {
snapshot[id] = { ...result.data };
}
}
const recoveryPoint: RecoveryPoint = {
id: `rp-${Date.now()}`,
timestamp: Date.now(),
taskIds,
snapshot,
};
await fs.promises.writeFile(
`${this.backupDir}/${recoveryPoint.id}.json`,
JSON.stringify(recoveryPoint, null, 2)
);
// 保留最近 10 个恢复点
await this.cleanupOldBackups(10);
return recoveryPoint.id;
}
/**
* 从恢复点恢复
* @param recoveryPointId 恢复点ID
* @returns 恢复的任务数量
*/
async restoreFromRecoveryPoint(recoveryPointId: string): Promise<number> {
const backupPath = `${this.backupDir}/${recoveryPointId}.json`;
const data = await fs.promises.readFile(backupPath, 'utf8');
const recoveryPoint = JSON.parse(data) as RecoveryPoint;
let restoredCount = 0;
for (const task of Object.values(recoveryPoint.snapshot)) {
const result = await this.persistence.save(task);
if (result.success) restoredCount++;
}
return restoredCount;
}
private async cleanupOldBackups(keepCount: number): Promise<void> {
const backups = await fs.promises.readdir(this.backupDir);
const sorted = backups.sort().reverse();
for (const backup of sorted.slice(keepCount)) {
await fs.promises.unlink(`${this.backupDir}/${backup}`);
}
}
}
7.4 错误返回码定义
| 错误码 | 含义 | 所属模块 | 处理建议 |
|---|---|---|---|
ENOENT |
任务不存在 | TaskPersistence | 检查任务ID是否正确 |
EEXIST |
任务已存在 | TaskPersistence | 使用其他任务ID |
ELOCKED |
文件被锁定 | TaskPersistence | 稍后重试 |
EIO |
I/O 错误 | TaskPersistence | 检查存储设备和网络 |
ENOSPC |
磁盘空间不足 | TaskPersistence | 清理磁盘空间 |
EINVALID |
无效数据格式 | TaskPersistence | 检查数据格式 |
ECONCURRENT |
并发写入冲突 | TaskPersistence | 刷新后重试 |
E_CYCLE |
循环依赖 | DependencyManager | 调整依赖关系 |
E_NOT_FOUND |
任务不存在 | DependencyManager | 检查任务ID |
E_INVALID_DEP |
无效依赖关系 | DependencyManager | 检查依赖配置 |
八、测试覆盖计划
8.1 测试分层策略
| 测试层级 | 覆盖范围 | 工具 |
|---|---|---|
| 单元测试 | 单个模块/函数 | Vitest |
| 集成测试 | 模块间协作 | Vitest |
| 端到端测试 | 完整工作流 | Playwright |
| 性能测试 | 大规模任务图 | k6 / 自定义 |
| 并发测试 | 多会话并发访问 | Vitest + 自定义 |
| 容错测试 | 异常场景和恢复 | Vitest + 模拟 |
8.2 测试覆盖矩阵
迭代 1:持久化存储层
| 测试场景 | 描述 | 优先级 | 测试文件 |
|---|---|---|---|
| CRUD 操作 | 任务创建、读取、更新、删除 | 高 | persistence.test.ts |
| ID 自增 | 验证任务 ID 连续递增 | 高 | persistence.test.ts |
| 文件格式 | JSON 序列化/反序列化正确性 | 高 | persistence.test.ts |
| 边界条件 | 空任务、超大任务描述 | 中 | persistence.test.ts |
| 并发写入冲突 | 多进程同时写入同一任务 | 高 | persistence-concurrent.test.ts |
| 文件锁超时 | 锁等待超时处理 | 中 | persistence-concurrent.test.ts |
| 磁盘空间不足 | ENOSPC 错误处理 | 中 | persistence-error.test.ts |
| 网络分区 | EIO 错误重试机制 | 中 | persistence-error.test.ts |
| 文件损坏恢复 | JSON 解析失败处理 | 中 | persistence-error.test.ts |
| 乐观锁版本控制 | 版本不匹配检测 | 高 | persistence-concurrent.test.ts |
迭代 2:依赖图管理
| 测试场景 | 描述 | 优先级 | 测试文件 |
|---|---|---|---|
| 依赖添加/移除 | 正确维护 blockedBy 关系 | 高 | dependency-manager.test.ts |
| 自动解锁 | 完成任务后下游任务自动解锁 | 高 | dependency-manager.test.ts |
| 循环依赖检测 | 检测并报告循环依赖 | 高 | dependency-manager.test.ts |
| 拓扑排序 | 正确生成任务执行顺序 | 高 | dependency-manager.test.ts |
| 多种循环模式 | 三角形循环、链式循环等 | 中 | dependency-manager.test.ts |
| 任务恢复机制 | undo 撤销操作 | 中 | dependency-manager.test.ts |
| 批量完成任务 | 多个任务同时完成 | 中 | dependency-manager.test.ts |
| 自依赖检测 | 检测并拒绝自依赖 | 高 | dependency-manager.test.ts |
迭代 3-7:集成测试
| 测试场景 | 描述 | 优先级 | 测试文件 |
|---|---|---|---|
| 会话恢复 | 重启会话后任务图完整恢复 | 高 | task-manager.test.ts |
| 并发访问 | 多个会话同时访问任务图 | 高 | task-manager-concurrent.test.ts |
| 任务执行 | 任务图驱动的子代理执行 | 高 | task-execution.test.ts |
| 错误恢复 | 部分失败时的状态一致性 | 中 | task-manager.test.ts |
| 跨会话任务引用 | task:// URI 支持 | 中 | session-integration.test.ts |
| 任务图持久化 | 重启后状态保持 | 高 | task-manager.test.ts |
8.3 性能测试指标
| 指标 | 目标 | 测量方法 | 测试文件 |
|---|---|---|---|
| 任务加载时间 | < 100ms(1000 任务) | 计时加载操作 | performance.test.ts |
| 依赖图构建 | < 50ms(1000 任务) | 计时拓扑排序 | performance.test.ts |
| 文件写入延迟 | < 10ms(单任务) | 计时保存操作 | performance.test.ts |
| 并发吞吐量 | > 100 ops/s | 多线程并发写入 | performance.test.ts |
| 锁等待时间 | < 100ms(99%) | 统计锁获取延迟 | performance.test.ts |
8.4 容错测试场景
| 测试场景 | 模拟方式 | 验证目标 |
|---|---|---|
| 磁盘空间不足 | 使用磁盘配额限制或模拟 ENOSPC | 正确返回错误码,不损坏数据 |
| 网络分区 | 模拟 I/O 错误或网络断开 | 重试机制正常工作 |
| 进程崩溃 | 强制终止进程 | 重启后数据一致性 |
| 文件损坏 | 手动修改 JSON 文件 | 正确检测并保留损坏文件 |
| 锁残留 | 创建锁文件后不删除 | 自动清理过期锁 |
| 并发冲突 | 多线程同时写入 | 乐观锁检测冲突 |
8.5 测试辅助工具
// packages/coding-agent/src/task/__tests__/test-utils.ts
export class MockFileSystem {
private files: Record<string, string> = {};
private locks: Record<string, number> = {};
async writeFile(path: string, content: string, options?: { flag?: string }) {
// 模拟磁盘空间不足
if (Object.keys(this.files).length > 1000) {
throw Object.assign(new Error("ENOSPC"), { code: "ENOSPC" });
}
// 模拟原子写入(wx 标志)
if (options?.flag === 'wx' && this.files[path]) {
throw Object.assign(new Error("EEXIST"), { code: "EEXIST" });
}
this.files[path] = content;
}
async readFile(path: string, encoding?: string): Promise<string> {
if (!this.files[path]) {
throw Object.assign(new Error("ENOENT"), { code: "ENOENT" });
}
return this.files[path];
}
// ... 其他方法
}
九、迁移路径
9.1 会话任务迁移
自动迁移(默认启用):
// 会话启动时自动检测并迁移
async function migrateSessionTasks(sessionDir: string): Promise<number> {
const todoData = await loadSessionTodo(sessionDir);
if (!todoData) return 0;
const migratedCount = 0;
for (const phase of todoData.phases) {
for (const item of phase.tasks) {
await taskManager.create(item.content, item.notes?.join("\n"));
migratedCount++;
}
}
return migratedCount;
}
迁移配置:
export const MigrationConfigSchema = z.object({
/** 是否自动迁移会话任务(默认关闭,强制手动触发) */
autoMigrate: z.boolean().default(false),
/** 是否保留原会话任务(迁移后删除,默认保留确保数据安全) */
keepOriginal: z.boolean().default(true),
/** 是否在迁移后同步完成状态 */
syncCompleted: z.boolean().default(true),
});
9.2 迁移工作流
用户打开会话
│
▼
检测到未迁移的会话任务
│
▼
提示用户手动迁移
│
├───────────────────────┐
▼ ▼
手动迁移(默认) 自动迁移(可选)
│ │
▼ ▼
用户确认后迁移 创建任务图任务
│ │
└──────────┬────────────┘
▼
同步完成状态
│
▼
任务图可用
十、性能评估
10.1 大规模任务图优化
优化策略:
| 策略 | 实现方式 | 预期收益 |
|---|---|---|
| 增量加载 | 按需加载任务详情 | 减少启动时间 |
| 任务归档 | 自动归档已完成任务 | 降低内存占用 |
| 缓存机制 | 缓存依赖图计算结果 | 加速查询 |
| 异步写入 | 批量写入磁盘 | 降低 IO 压力 |
归档策略:
export interface ArchiveConfig {
/** 归档阈值(完成时间超过此天数的任务) */
archiveAfterDays: number;
/** 是否压缩归档文件 */
compressArchive: boolean;
/** 归档文件存储目录 */
archiveDir: string;
}
10.2 内存占用预估
| 任务数量 | 预估内存 | 优化后内存 |
|---|---|---|
| 100 | ~500KB | ~500KB |
| 1000 | ~5MB | ~3MB |
| 10000 | ~50MB | ~20MB |
十一、下一步行动
建议从迭代 1开始,先实现持久化存储层。每个迭代完成后:
- 运行单元测试验证
- 编写集成测试
- 更新文档
- 准备进入下一迭代
十二、适配器模式集成方案
12.1 核心设计思想
为了实现 s07 文档的核心目标——"任务图是多步工作的默认选择,Todo 仍可用于单次会话内的快速清单",本方案采用适配器模式实现新旧系统的无缝集成:
┌─────────────────────────────────────────────────────────────┐
│ 任务路由层(新增) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 判断任务类型 → 选择合适的工具 │ │
│ │ • 简单任务 → todo_write(会话级) │ │
│ │ • 复杂任务 → task_graph(跨会话级,默认) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ ▼ ▼ │
┌───────────────────┐ ┌───────────────────────────┐ │
│ todo_write │ │ task_graph │ │
│ (快速清单) │ │ (默认:多步任务管理) │ │
└───────────────────┘ └───────────────────────────┘ │
│ │
▼ │
┌───────────────────┐ │
│ 适配器桥接层 │ │
│ import / sync │ │
└───────────────────┘ │
└─────────────────────────────────────────────────────────────┘
12.2 适配器接口设计
新增文件: packages/coding-agent/src/task/task-graph-adapter.ts
import * as z from "zod/v4";
import type { TodoPhase, TodoItem } from "../tools/todo-write";
/** 任务状态枚举(参考 swarm-extension 扩展) */
export const TaskStatus = z.enum([
"pending", // 待执行
"waiting", // 等待依赖
"in_progress", // 执行中
"completed", // 已完成
"failed", // 失败
"aborted", // 已中止
"abandoned", // 已放弃
]);
export type TaskStatus = z.infer<typeof TaskStatus>;
/** 任务数据结构(参考 swarm-extension 双向依赖设计) */
export const TaskSchema = z.object({
id: z.number().describe("Unique task identifier"),
subject: z.string().describe("Task title/summary"),
description: z.optional(z.string().describe("Detailed task description")),
status: TaskStatus,
blockedBy: z.array(z.number()).describe("List of task IDs this task depends on"),
blocks: z.array(z.number()).describe("List of task IDs blocked by this task"),
reportsTo: z.array(z.number()).describe("List of task IDs this task reports to"),
createdAt: z.number().describe("Unix timestamp in milliseconds"),
updatedAt: z.number().describe("Unix timestamp in milliseconds"),
});
export type Task = z.infer<typeof TaskSchema>;
/** 任务图接口 */
export interface TaskGraph {
create(subject: string, description?: string, blockedBy?: number[]): Task;
get(taskId: number): Task | null;
update(task: Task): void;
delete(taskId: number): void;
list(filter?: { status?: TaskStatus; blocked?: boolean }): Task[];
complete(taskId: number): void;
getAvailableTasks(): Task[];
getBlockedTasks(): Task[];
getExecutionWaves(): ExecutionWave[];
}
/** 执行波次(参考 swarm-extension/dag.ts) */
export interface ExecutionWave {
waveIndex: number;
taskIds: number[];
}
/**
* 任务图适配器 - 桥接新旧任务系统
* 核心目标:不修改现有代码,实现渐进式迁移
*/
export interface TaskGraphAdapter {
loadFromSession(sessionDir: string): TaskGraph;
saveToSession(sessionDir: string, graph: TaskGraph): void;
toTodoPhases(graph: TaskGraph): TodoPhase[];
fromTodoPhases(phases: TodoPhase[]): TaskGraph;
importAvailableTasks(graph: TaskGraph, limit?: number): TodoPhase[];
syncCompletedTask(graph: TaskGraph, item: TodoItem): boolean;
syncAllTasks(graph: TaskGraph, phases: TodoPhase[]): number;
}
12.3 智能任务路由器
新增文件: packages/coding-agent/src/task/task-router.ts
/** 任务复杂度分析结果 */
export interface TaskAnalysis {
complexity: "simple" | "complex";
reason: string;
estimatedSteps: number;
hasDependencies: boolean;
}
/**
* 任务路由器 - 根据任务特征选择合适的工具
* 核心逻辑:复杂任务自动使用 task_graph(默认),简单任务使用 todo_write
*/
export class TaskRouter {
/**
* 分析任务复杂度
* @param prompt 用户输入的任务描述
* @returns 任务复杂度分析结果
*/
analyzeTask(prompt: string): TaskAnalysis {
const features = this.extractFeatures(prompt);
// 判断复杂度:有依赖关系 或 步骤数 > 3 → 复杂任务
if (features.hasDependencies || features.estimatedSteps > 3) {
return {
complexity: "complex",
reason: features.hasDependencies
? "任务包含依赖关系"
: `任务预计需要 ${features.estimatedSteps} 个步骤`,
estimatedSteps: features.estimatedSteps,
hasDependencies: features.hasDependencies,
};
}
return {
complexity: "simple",
reason: "简单任务,适合会话内快速完成",
estimatedSteps: features.estimatedSteps,
hasDependencies: features.hasDependencies,
};
}
/**
* 根据任务复杂度选择合适的工具
* @param prompt 用户输入的任务描述
* @returns 推荐的工具名称
*/
selectTool(prompt: string): "todo_write" | "task_graph" {
const analysis = this.analyzeTask(prompt);
return analysis.complexity === "complex" ? "task_graph" : "todo_write";
}
}
12.4 统一任务工具
新增文件: packages/coding-agent/src/tools/task.ts
const TaskOp = z.enum([
"create", // 自动判断并创建任务
"create_simple", // 强制使用 todo_write
"create_complex", // 强制使用 task_graph
"analyze", // 分析任务复杂度
"list", // 列出所有任务(合并 todo 和 task_graph)
"import", // 从任务图导入可执行任务到会话
"sync", // 同步完成状态到任务图
]);
12.5 配置选项
修改文件: packages/coding-agent/src/config.ts
export const TaskConfigSchema = z.object({
/** 是否启用自动任务路由 */
autoRoute: z.boolean().default(true),
/** 默认工具(当自动路由关闭时使用) */
defaultTool: z.enum(["todo_write", "task_graph"]).default("task_graph"),
/** 复杂任务的步骤阈值 */
complexityThreshold: z.number().default(3),
/** 是否自动导入任务图到会话 */
autoImport: z.boolean().default(true),
/** 是否自动同步完成状态 */
autoSync: z.boolean().default(true),
/** 最大导入任务数量 */
importLimit: z.number().default(5),
/** 执行模式(参考 swarm-extension/schema.ts) */
executionMode: z.enum([
"default", // 默认模式,按依赖顺序执行
"pipeline", // 管道模式,支持多次迭代
"parallel", // 并行模式,无依赖任务同时执行
"sequential", // 顺序模式,按声明顺序执行
]).default("default"),
/** pipeline 模式专属:目标迭代次数 */
targetIterations: z.number().default(1),
/** parallel 模式专属:最大并发数 */
maxConcurrency: z.number().default(4),
});
12.6 工作流示例
场景 1:创建简单任务(自动路由到 todo_write)
用户:帮我创建一个任务:"完成文档"
助手:分析任务复杂度...简单任务(1个步骤,无依赖)
自动选择:todo_write
结果:创建了快速任务,存储在会话中
场景 2:创建复杂任务(自动路由到 task_graph)
用户:帮我规划这个功能:第一步设计 API,第二步实现核心逻辑,第三步编写测试
助手:分析任务复杂度...复杂任务(3个步骤)
自动选择:task_graph
结果:创建了 3 个任务,建立依赖链,自动导入第一个可用任务到会话
场景 3:列出所有任务
用户:列出所有任务
结果:
=== Session Tasks (todo_write) ===
[Quick Tasks]
[ ] 完成文档
=== Persistent Tasks (task_graph) ===
#1 [completed] 设计 API
#2 [pending] 实现核心逻辑 (blocked by: 1)
#3 [pending] 编写测试 (blocked by: 2)
12.7 与原始设计的对比
| 维度 | 原始设计(s07-task-system.md) | 适配器方案 |
|---|---|---|
| 向后兼容 | ❌ 替换现有系统 | ✅ 完全兼容 |
| 默认工具 | task_graph | task_graph(符合文档要求) |
| 快速清单 | ❌ 移除 | ✅ 保留 todo_write |
| 渐进式迁移 | ❌ 一次性迁移 | ✅ 支持分阶段迁移 |
| 智能路由 | ❌ 无 | ✅ 根据复杂度自动选择 |
| 集成方式 | 直接修改核心逻辑 | 通过适配器桥接 |
12.8 优势总结
| 优势 | 说明 |
|---|---|
| 符合文档要求 | 任务图是多步工作的默认选择 |
| 保留快速清单 | Todo 仍可用于单次会话内的快速任务 |
| 向后兼容 | 不修改现有代码,原有的工具和命令保持不变 |
| 智能路由 | 根据任务复杂度自动选择合适的工具 |
| 渐进式迁移 | 支持从并行运行到完全集成的分阶段迁移 |
| 职责分离 | 会话级任务与跨会话任务清晰分离 |
12.9 可借鉴的实现参考(claude_code/src)
根据对 claude_code/src 任务系统的深入分析,以下是可直接借鉴的代码参考点:
12.9.1 任务数据模型设计
参考文件: src/utils/tasks.ts(第 76-89 行)
借鉴点: 双向依赖关系设计
// 参考:双向依赖模型
const TaskSchema = z.object({
blocks: z.array(z.string()), // 此任务阻塞的任务
blockedBy: z.array(z.string()), // 阻塞此任务的任务
});
应用建议: 在 TaskSchema 中增加 blocks 字段,实现双向依赖追踪。
12.9.2 文件持久化策略
参考文件: src/utils/tasks.ts(第 221-231 行)
借鉴点: 文件级存储方案
// 参考:任务文件路径结构
export function getTaskPath(taskListId: string, taskId: string): string {
return join(getTasksDir(taskListId), `${taskId}.json`);
}
应用建议: 采用 ~/.omp/tasks/<taskListId>/<taskId>.json 结构存储任务。
12.9.3 并发安全机制
参考文件: src/utils/tasks.ts(第 102-108、284-308 行)
借鉴点: 文件锁 + 高水位标记
// 参考:锁配置
const LOCK_OPTIONS = {
retries: { retries: 30, minTimeout: 5, maxTimeout: 100 },
};
// 参考:创建任务时获取锁
const release = await lockfile.lock(lockPath, LOCK_OPTIONS);
const highestId = await findHighestTaskId(taskListId);
应用建议: 使用 proper-lockfile 实现文件级锁,保证多进程安全。
12.9.4 状态管理与事件通知
参考文件: src/utils/tasks.ts(第 17-67 行)
借鉴点: 响应式状态同步
// 参考:事件信号机制
const tasksUpdated = createSignal();
export const onTasksUpdated = tasksUpdated.subscribe;
export function notifyTasksUpdated(): void {
tasksUpdated.emit();
}
应用建议: 在 task-router.ts 中实现任务更新事件通知,支持 UI 响应式更新。
12.9.5 工具接口设计
参考文件: src/tools/TaskCreateTool/TaskCreateTool.ts(第 48-129 行)
借鉴点: 统一工具接口规范
// 参考:工具定义模式
export const TaskCreateTool = buildTool({
name: TASK_CREATE_TOOL_NAME,
async description() { return DESCRIPTION },
get inputSchema() { return inputSchema() },
get outputSchema() { return outputSchema() },
async call({ subject, description }, context) {
// 实现逻辑
},
});
应用建议: 在 task.ts 统一任务工具中采用相同的模式。
12.9.6 任务列表查询
参考文件: src/tools/TaskListTool/TaskListTool.ts(第 64-90 行)
借鉴点: 依赖过滤逻辑
// 参考:过滤已完成的依赖
const resolvedTaskIds = new Set(
allTasks.filter(t => t.status === 'completed').map(t => t.id)
);
const tasks = allTasks.map(task => ({
...task,
blockedBy: task.blockedBy.filter(id => !resolvedTaskIds.has(id)),
}));
应用建议: 在 TaskGraph.getAvailableTasks() 中实现类似的依赖过滤。
12.9.7 任务更新与依赖管理
参考文件: src/tools/TaskUpdateTool/TaskUpdateTool.ts(第 300-324 行)
借鉴点: 依赖关系更新
// 参考:建立双向依赖
if (addBlocks && addBlocks.length > 0) {
for (const blockId of newBlocks) {
await blockTask(taskListId, taskId, blockId);
}
}
应用建议: 在 TaskGraph.update() 中支持依赖关系的增删。
12.9.8 任务状态订阅
参考文件: src/hooks/useTasksV2.ts(第 29-200 行)
借鉴点: 响应式任务列表订阅
// 参考:文件监听 + 轮询回退
class TasksV2Store {
#watcher: FSWatcher | null = null;
#pollTimer: ReturnType<typeof setTimeout> | null = null;
#fetch = async (): Promise<void> => {
this.#rewatch(getTasksDir(taskListId));
const current = await listTasks(taskListId);
this.#tasks = current;
this.#notify();
};
}
应用建议: 在适配器中实现类似的任务列表订阅机制。
12.9.9 借鉴参考对照表
| claude_code 文件路径 | 借鉴内容 | 适配方案应用位置 |
|---|---|---|
src/utils/tasks.ts |
任务数据模型、双向依赖 | task-graph-adapter.ts |
src/utils/tasks.ts |
文件锁、高水位标记 | 并发安全机制 |
src/utils/tasks.ts |
事件信号机制 | 状态同步 |
src/hooks/useTasksV2.ts |
文件监听、轮询回退 | 任务列表订阅 |
src/tools/TaskCreateTool.ts |
工具接口规范 | task.ts |
src/tools/TaskUpdateTool.ts |
依赖管理 | TaskGraph.update() |
src/tools/TaskListTool.ts |
依赖过滤 | TaskGraph.getAvailableTasks() |
十二、Swarm-Extension DAG 功能分析与优化建议
12.10 Swarm-Extension DAG 功能概述
项目中已存在的 swarm-extension/src/swarm/dag.ts 提供了成熟的 DAG(有向无环图)实现,可作为任务图设计的重要参考:
12.10.1 核心功能
| 模块 | 文件路径 | 功能描述 |
|---|---|---|
| DAG 构建 | swarm/dag.ts |
构建依赖图、检测循环、生成执行波次 |
| 数据模型 | swarm/schema.ts |
Swarm 配置定义、YAML 解析 |
| 状态管理 | swarm/state.ts |
持久化状态追踪(.swarm_<name>/) |
| 执行器 | swarm/executor.ts |
执行单个 swarm agent |
| 流程控制 | swarm/pipeline.ts |
编排执行波次和迭代 |
12.10.2 关键设计特点
双向依赖关系(dag.ts 第 34-40 行):
- 支持
waits_for(显式等待)和reports_to(隐式依赖)两种关系定义 reports_to语义:A reports_to B 意味着 B 等待 A 完成
三种执行模式(schema.ts 第 27 行):
pipeline:管道模式,支持多次迭代(target_count)parallel:并行模式,所有无依赖关系的 Agent 并行执行sequential:顺序模式,按声明顺序依次执行
执行波次生成(dag.ts 第 106-146 行):
- 使用 Kahn's 算法进行拓扑排序
- 同一波次内的节点并行执行,波次间顺序执行
持久化状态追踪(state.ts 第 42-127 行):
- 持久化到
.swarm_<name>/目录 - 支持断点续传和状态恢复
12.11 功能覆盖对比
| 功能 | 文档设计 | Swarm-Extension 实现 | 优化建议 |
|---|---|---|---|
| 依赖关系 | blocked_by 单向依赖 |
waits_for + reports_to 双向依赖 |
✅ 采用双向依赖设计 |
| 执行模式 | 未提及 | pipeline/parallel/sequential | ✅ 引入多模式支持 |
| 拓扑排序 | 提及但未详述 | Kahn's 算法 + 执行波次 | ✅ 采用执行波次概念 |
| 循环检测 | 提及 | Kahn's 算法 | ✅ 保持一致 |
| 持久化 | .tasks/task_<id>.json |
.swarm_<name>/ 目录结构 |
✅ 扩展目录结构 |
| 状态恢复 | 未明确提及 | 完整支持断点续传 | ✅ 添加状态恢复机制 |
| 迭代执行 | 未提及 | 支持多次迭代 | ✅ 添加迭代支持 |
| YAML 配置 | 未提及 | 完整支持 YAML | ✅ 考虑 YAML 支持 |
12.12 架构优化建议
12.12.1 与 Swarm-Extension 的集成方案
建议集成策略:
- Task Graph:专注于任务管理(持久化、依赖管理、状态追踪)
- Swarm-Extension:专注于执行编排(多 Agent 协作、波次执行、迭代)
可能的桥接方案:
export class SwarmTaskBridge {
#taskGraph: TaskGraph;
#swarmDef: SwarmDefinition;
taskGraphToSwarm(): SwarmDefinition;
syncSwarmResult(result: PipelineResult): void;
}
12.12.2 代码复用建议
| Swarm-Extension 组件 | 复用内容 | 应用位置 |
|---|---|---|
buildExecutionWaves |
执行波次生成逻辑 | dependency-manager.ts |
detectCycles |
Kahn's 算法循环检测 | dependency-manager.ts |
StateTracker |
持久化设计 | persistence.ts |
PipelineController |
编排逻辑 | task-manager.ts |
十三、文档完备性说明
13.1 已补充的关键特性
本文档已补充 oh-my-pi 任务系统的以下关键特性:
13.1.1 todo-write 工具的 Markdown 双向转换
- 实现位置:
packages/coding-agent/src/tools/todo-write.ts(行号需确认) - 核心功能:
phasesToMarkdown():将任务列表渲染为标准 Markdown checklist 格式markdownToPhases():解析 Markdown checklist 并恢复为任务列表
- 支持的状态标记:
[ ]或[ ]:pending(待办)[x]或[X]:completed(已完成)[/]或[>]:in_progress(进行中)[-]或[~]:abandoned(已放弃)
- 备注支持:使用 blockquote 格式(
> text)附加任务备注 - 实际价值:用户可以手动编辑 Markdown 文件来管理任务,支持版本控制和协作
13.1.2 task 工具的计划模式集成
- 实现位置:
packages/coding-agent/src/task/index.ts第 614-623 行 - 触发条件:父会话启用计划模式(
planModeState.enabled === true) - 核心行为:
- 注入
plan-mode-subagent.md提示词到子代理的 system prompt - 限制子代理只能使用只读工具:
read、search、find、lsp、web_search - 禁用子代理的 spawns 能力(防止递归调用)
- 注入
- 实际价值:在规划阶段确保子代理只进行只读分析,不修改代码
13.1.3 task 工具的模型覆盖机制
- 实现位置:
packages/coding-agent/src/task/index.ts第 626-635 行 - 优先级顺序:
- 设置覆盖(
task.agentModelOverrides[agentName]) - Agent 定义中的模型(
agent.model) - 父会话的模型(
session.getActiveModelString())
- 设置覆盖(
- 环境变量支持:
PI_BLOCKED_AGENT防止递归调用(第 232 行) - 实际价值:允许为不同子代理配置不同的模型,优化成本和性能
10.1.4 task 工具的隔离执行流程
- 实现位置:
packages/coding-agent/src/task/index.ts第 670-1157 行 - 三种隔离模式:
- worktree 模式(第 888-889 行):
- 创建 Git worktree 进行隔离
- 适合 Git 仓库项目
- 支持分支合并模式
- fuse-overlay 模式(第 884 行):
- 使用 FUSE overlayfs 实现文件系统隔离
- 适合 Linux/macOS 系统
- 性能较好,但需要 FUSE 支持
- fuse-projfs 模式(第 886 行):
- 使用 FUSE projfs 实现文件系统隔离
- 适合 Windows 系统
- Windows 原生支持
- worktree 模式(第 888-889 行):
- 自动清理:任务执行完成后自动清理隔离环境(第 999-1007 行)
- 结果合并:
- patch 模式(第 1103-1151 行):生成并应用补丁
- branch 模式(第 1073-1101 行):创建并合并分支
13.1.5 executor.ts 子代理执行器详解
- 实现位置:
packages/coding-agent/src/task/executor.ts - 核心功能:负责在进程内运行子代理,转发 AgentEvents 进行进度追踪
- 执行流程:
- 初始化阶段(第 455-513 行):解析参数、检查中止状态、设置进度对象
- 配置阶段(第 515-560 行):设置 artifacts 路径、递归深度限制、工具列表、模型配置
- 事件处理阶段(第 722-910 行):注册事件监听器,处理
message_start、tool_execution_start/end、message_update/end、agent_end等事件 - 子代理执行(第 912-1192 行):创建会话、发送任务提示、等待空闲、处理 yield 工具调用
- 结果整理(第 1203-1291 行):处理输出、计算状态、生成最终结果
- IPC 通信机制:
- 通过
EventBus发送子代理事件到三个通道:TASK_SUBAGENT_EVENT_CHANNEL:转发所有 AgentEventTASK_SUBAGENT_LIFECYCLE_CHANNEL:发送生命周期事件(started/completed/failed/aborted)TASK_SUBAGENT_PROGRESS_CHANNEL:发送进度更新
- 支持 MCP 代理工具:通过
createMCPProxyTools()复用父会话的 MCP 连接(第 390-438 行)
- 通过
- 进度追踪:实时追踪工具调用次数、token 消耗、执行耗时(第 746-785 行)
- Yield 工具处理:自动检测 yield 工具调用,提取任务结果(第 804-806 行)
- 超时处理:支持 MCP 调用超时(60 秒)和中止信号处理(第 90-120 行)
13.1.6 render.ts TUI 渲染器详解
- 实现位置:
packages/coding-agent/src/task/render.ts - 核心功能:提供
renderCall和renderResult函数,在终端 UI 中显示任务执行状态 - 渲染组件:
- 状态图标(第 38-51 行):根据任务状态显示不同图标(pending/running/completed/failed/aborted)
- 进度渲染(第 491-629 行):
renderAgentProgress()渲染运行中的子代理进度 - 结果渲染(第 729-881 行):
renderAgentResult()渲染完成后的子代理结果 - JSON 树渲染(第 123-260 行):支持嵌套 JSON 数据的树形展示
- 审查结果渲染(第 634-676 行):
renderReviewResult()渲染代码审查结果(正确性、置信度、发现列表) - 查找结果渲染(第 681-724 行):
renderFindings()渲染审查发现,按优先级排序
- 扩展点:
- 通过
subprocessToolRegistry.register()注册自定义工具渲染器(第 1005-1014 行) - 支持自定义
extractData和renderFinal方法
- 通过
- 渲染模式:
- 折叠模式:显示摘要信息,限制行数(默认 3 行)
- 展开模式:显示完整内容,支持深度嵌套(默认 6 层)
- 主题支持:通过
Theme对象实现终端颜色和样式定制
13.1.7 SDK 集成说明
- 实现位置:
packages/coding-agent/src/sdk/ - 使用方式:
import { createAgentSession, discoverAuthStorage } from "@oh-my-pi/coding-agent"; // 创建任务执行会话 const authStorage = await discoverAuthStorage(); const { session } = await createAgentSession({ cwd: "/path/to/project", authStorage, toolNames: ["task"], // ... 其他配置 }); // 调用 task 工具执行子代理任务 const result = await session.prompt("Execute task", { attribution: "agent", }); - session-manager 集成:
- 会话初始化时自动加载任务图(第 592-594 行)
- 支持
task://URI 协议读取任务详情
- executor 集成:
runSubprocess()函数是任务执行的核心入口(第 455 行)- 支持通过
ExecutorOptions配置任务参数、模型覆盖、隔离模式等
13.2 API 设计风格说明
本文档的 API 设计遵循 oh-my-pi 现有代码风格:
13.2.1 使用 Zod v4 进行类型定义
// 推荐(与现有代码一致)
import * as z from "zod/v4";
export const TaskStatus = z.enum(["pending", "in_progress", "completed"]).describe("Task status");
export type TaskStatus = z.infer<typeof TaskStatus>;
export const TaskSchema = z.object({
id: z.number().describe("Unique task identifier"),
subject: z.string().describe("Task title/summary"),
// ...
});
export type Task = z.infer<typeof TaskSchema>;
参考位置:
packages/coding-agent/src/task/types.ts第 4 行(Zod v4 导入)packages/coding-agent/src/tools/todo-write.ts第 6 行(Zod v4 导入)
13.2.2 错误处理模式
oh-my-pi 采用返回值错误处理模式,避免抛出异常:
// 推荐(返回 null 而非抛出异常)
export interface TaskPersistence {
load(taskId: number): Task | null;
// ...
}
// 推荐(返回结构化错误信息)
validateDAG(): { valid: boolean; cycle?: number[] };
参考位置:
packages/coding-agent/src/task/index.ts第 534-550 行(错误返回模式)packages/coding-agent/src/task/types.ts第 164-186 行(结构化错误定义)
工具级错误处理示例(task/index.ts 第 553-570 行):
// 检查代理是否在设置中被禁用
const disabledAgents = this.session.settings.get("task.disabledAgents") as string[];
if (disabledAgents.length > 0 && disabledAgents.includes(agentName)) {
const enabled = agents.filter(a => !disabledAgents.includes(a.name)).map(a => a.name);
return {
content: [{
type: "text",
text: `Agent "${agentName}" is disabled in settings.
Enable it via /agents, or use a different agent type.
${enabled.length > 0 ? ` Available: ${enabled.join(", ")}` : ""}`
}],
details: { projectAgentsDir, results: [], totalDurationMs: 0 },
};
}
核心原则:
- 工具执行失败时返回带有
content和details的标准结果对象 - 错误信息包含在
content数组中的text类型项 details保持完整结构,results为空数组,totalDurationMs记录耗时
13.2.3 接口方法命名
- 使用动词开头:
create、load、save、delete - 查询方法使用
get或list前缀:getTask、listTasks - 布尔查询使用
is前缀:isAvailable、isBlocked
13.2.4 并发控制机制
oh-my-pi 通过 parallel.ts 提供并发控制能力:
Semaphore 信号量类(task/parallel.ts 第 89-115 行):
export class Semaphore {
#max: number;
#current = 0;
#queue: Array<() => void> = [];
constructor(max: number) {
this.#max = Math.max(1, max);
}
async acquire(): Promise<void> {
if (this.#current < this.#max) {
this.#current++;
return;
}
const { promise, resolve } = Promise.withResolvers<void>();
this.#queue.push(resolve);
return promise;
}
release(): void {
const next = this.#queue.shift();
if (next) {
next();
} else {
this.#current--;
}
}
}
mapWithConcurrencyLimit 并行执行函数(task/parallel.ts 第 26-84 行):
export async function mapWithConcurrencyLimit<T, R>(
items: T[],
concurrency: number,
fn: (item: T, index: number) => Promise<R>,
signal?: AbortSignal,
): Promise<ParallelResult<R>> {
// 创建 worker pool,限制并发数
const workers = Array(limit).fill(null).map(() => worker());
// ...
}
使用方式(task/index.ts):
const results = await mapWithConcurrencyLimit(
tasks,
maxConcurrency, // 从 settings.get("task.maxConcurrency") 获取
async (task, index) => {
return await runSubprocess({ ...options, task });
},
signal,
);
并发配置:通过 task.maxConcurrency 设置控制最大并发数。
13.3 架构图补充说明
本文档第 3.3 节的架构图已更新,补充了以下细节:
13.3.1 todo-write 工具架构
- 明确展示 Markdown 双向转换能力
- 展示 Phase 分组管理机制
- 说明 Session 存储方式(
USER_TODO_EDIT_CUSTOM_TYPE)
13.3.2 task 工具架构
- 展示 Agent 发现机制的三层结构
- 详细说明三种隔离执行模式
- 展示结果合并的两种模式
- 补充计划模式集成和模型覆盖机制
10.4 与现有代码的兼容性
本文档的设计确保与现有代码的兼容性:
| 方面 | 兼容性保证 |
|---|---|
| 类型系统 | 使用 Zod v4,与现有类型定义风格一致 |
| 错误处理 | 返回 null 或结构化错误,不抛出异常 |
| 命名规范 | 遵循现有命名约定(动词开头、驼峰命名) |
| 文件组织 | 新增文件放在 packages/coding-agent/src/task/ 目录 |
| 工具集成 | 新增工具命令作为 task 工具的扩展,不修改现有工具 |
| 会话集成 | 通过接口扩展,不修改核心会话逻辑 |
十四、需要修改的现有代码列表
14.1 修改策略概述
为最小化对现有代码的侵入,采用独立模块 + 最小引用方案:
| 策略 | 说明 | 侵入性 |
|---|---|---|
| 新建独立模块 | 任务图功能完全封装在新文件中 | 零侵入 |
| 最小引用集成 | 原文件仅添加导入和调用,不修改核心逻辑 | 低侵入 |
| 配置开关控制 | 通过配置决定是否启用任务图功能 | 风险可控 |
14.2 需要修改的现有文件(最小侵入)
| 文件路径 | 修改目的 | 修改类型 | 侵入性 | 改动量 |
|---|---|---|---|---|
packages/coding-agent/src/task/types.ts |
添加 TaskGraph、Task 等新类型定义 | 扩展 | 低 | < 50 行 |
packages/coding-agent/src/task/index.ts |
添加 TaskManager 导入和调用入口 | 扩展 | 低 | < 30 行 |
packages/coding-agent/src/session/session-manager.ts |
添加任务图加载钩子和 URI 解析方法 | 扩展 | 低 | < 40 行 |
packages/coding-agent/src/config.ts |
添加任务系统配置选项 | 扩展 | 低 | < 30 行 |
14.3 需要新增的文件(零侵入)
| 文件路径 | 目的 | 职责 |
|---|---|---|
packages/coding-agent/src/task/persistence.ts |
任务持久化层 | 读写 .tasks/task_{id}.json,文件锁控制 |
packages/coding-agent/src/task/dependency-manager.ts |
依赖关系管理 | 依赖添加/移除、自动解锁、DAG 验证 |
packages/coding-agent/src/task/task-manager.ts |
任务管理器核心 | 组合持久化和依赖管理,提供统一接口 |
packages/coding-agent/src/task/task-graph-adapter.ts |
适配器桥接层 | 桥接新旧任务系统,支持数据迁移 |
packages/coding-agent/src/task/task-router.ts |
智能任务路由器 | 根据任务复杂度选择工具 |
packages/coding-agent/src/tools/task.ts |
统一任务工具入口 | 提供 task_create/update/list/get/complete 命令 |
14.4 修改详情
14.4.1 packages/coding-agent/src/task/types.ts
修改目的:添加持久化任务图所需的核心类型定义
// 新增任务状态枚举
export const TaskStatus = z.enum(["pending", "in_progress", "completed"]);
export type TaskStatus = z.infer<typeof TaskStatus>;
// 新增任务定义
export const TaskSchema = z.object({
id: z.number().describe("Unique task identifier"),
subject: z.string().describe("Task title/summary"),
description: z.string().optional().describe("Detailed description"),
status: TaskStatus,
blockedBy: z.array(z.number()).default([]).describe("Task IDs that block this task"),
createdAt: z.number().describe("Unix timestamp"),
updatedAt: z.number().describe("Unix timestamp"),
completedAt: z.number().optional().describe("Unix timestamp when completed"),
});
export type Task = z.infer<typeof TaskSchema>;
// 新增任务图定义
export interface TaskGraph {
tasks: Map<number, Task>;
nextId: number;
}
14.4.2 packages/coding-agent/src/task/index.ts
修改目的:添加 TaskManager 集成入口(最小侵入)
// 新增导入
import { TaskManager } from "./task-manager";
class TaskTool {
#taskManager: TaskManager | null = null;
async execute(
_toolCallId: string,
rawParams: unknown,
signal?: AbortSignal,
onUpdate?: AgentToolUpdateCallback<TaskToolDetails>,
): Promise<AgentToolResult<TaskToolDetails>> {
const params = rawParams as TaskParams;
// 初始化 TaskManager(懒加载)
if (!this.#taskManager) {
this.#taskManager = new TaskManager({
tasksDir: path.join(this.session.cwd, ".tasks"),
settings: this.session.settings,
});
}
// 判断是否为任务图命令
if (params.command === "create_task" || params.command === "complete_task") {
return this.#executeTaskGraphCommand(params, signal, onUpdate);
}
// 原有执行逻辑保持不变
// ...
}
/**
* 执行任务图相关命令(新增方法,不修改原有逻辑)
*/
async #executeTaskGraphCommand(
params: TaskParams,
signal?: AbortSignal,
onUpdate?: AgentToolUpdateCallback<TaskToolDetails>,
): Promise<AgentToolResult<TaskToolDetails>> {
if (!this.#taskManager) {
return createTaskModeError("Task manager not initialized");
}
switch (params.command) {
case "create_task":
const task = this.#taskManager.create(
params.subject,
params.description,
params.blockedBy,
);
return {
content: [{ type: "text", text: `Created task #${task.id}: ${task.subject}` }],
details: { projectAgentsDir: null, results: [], totalDurationMs: 0 },
};
case "complete_task":
const completed = this.#taskManager.complete(params.taskId);
return {
content: [{ type: "text", text: `Completed task #${params.taskId}` }],
details: { projectAgentsDir: null, results: [], totalDurationMs: 0 },
};
default:
return createTaskModeError(`Unknown command: ${params.command}`);
}
}
}
侵入性说明:
- 仅添加导入和新方法,不修改原有
execute()和#executeSync()方法 - 通过懒加载延迟初始化,不影响启动性能
- 原有子代理执行功能保持完全独立
14.4.3 packages/coding-agent/src/session/session-manager.ts
修改目的:添加任务图加载钩子和 URI 解析(最小侵入)
// 新增导入
import { TaskManager } from "../task/task-manager";
import { TaskUriHandler } from "./task-uri-handler";
class SessionManager {
#taskManager: TaskManager | null = null;
async #initSessionFile(sessionFile: string): Promise<void> {
await this.setSessionFile(sessionFile);
// 条件加载任务图(通过配置控制)
if (this.#shouldLoadTaskGraph()) {
await this.#initTaskManager();
}
}
/**
* 初始化任务管理器(懒加载)
*/
async #initTaskManager(): Promise<void> {
try {
const tasksDir = path.join(this.cwd, ".tasks");
this.#taskManager = new TaskManager({ tasksDir });
await this.#taskManager.load();
} catch {
// 任务图加载失败不影响会话启动
this.#taskManager = null;
}
}
/**
* 判断是否应该加载任务图
*/
#shouldLoadTaskGraph(): boolean {
return this.session.settings.get("task.graph.enabled") === true;
}
/**
* 解析 task:// URI(新增方法)
*/
resolveTaskUri(uri: string): Task | undefined {
if (!this.#taskManager) return undefined;
return TaskUriHandler.resolve(uri, this.#taskManager);
}
/**
* 获取任务管理器(新增方法)
*/
getTaskManager(): TaskManager | null {
return this.#taskManager;
}
}
侵入性说明:
- 新增私有字段
#taskManager(不影响公共 API) - 新增方法
resolveTaskUri()和getTaskManager()(扩展公共 API) - 在
#initSessionFile()中添加条件加载钩子(低侵入) - 任务图加载失败不影响会话启动(容错设计)
14.4.4 packages/coding-agent/src/config.ts
修改目的:添加任务系统配置选项
export const TaskConfigSchema = z.object({
// 任务图核心配置
graph: z.object({
enabled: z.boolean().default(false), // 默认关闭,保持核心功能纯净
autoLoad: z.boolean().default(true),
}),
// 迁移配置(独立模块控制)
migration: z.object({
enabled: z.boolean().default(false), // 默认关闭迁移功能
keepOriginal: z.boolean().default(true), // 保留原数据
syncCompleted: z.boolean().default(true),
}),
// 路由配置
router: z.object({
autoRoute: z.boolean().default(true),
defaultTool: z.enum(["todo_write", "task_graph"]).default("task_graph"),
complexityThreshold: z.number().default(3),
}),
// 导入配置
import: z.object({
autoImport: z.boolean().default(true),
autoSync: z.boolean().default(true),
importLimit: z.number().default(5),
}),
});
export type TaskConfig = z.infer<typeof TaskConfigSchema>;
侵入性说明:
- 仅添加新配置项,不修改现有配置结构
- 默认值设计确保新环境零开销
14.5 架构设计优势
14.5.1 最小侵入架构图
┌─────────────────────────────────────────────────────────────────┐
│ 原有代码层 │
│ ┌─────────────────────────┐ ┌───────────────────────────┐ │
│ │ task/index.ts │ │ session-manager.ts │ │
│ │ (原有子代理执行逻辑) │ │ (原有会话管理逻辑) │ │
│ └───────────┬─────────────┘ └─────────────┬───────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 桥接层(最小侵入) │ │
│ │ • 导入 TaskManager │ │
│ │ • 添加命令分发入口 │ │
│ │ • 条件加载任务图 │ │
│ │ • 提供 URI 解析方法 │ │
│ └───────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 新建独立模块层(零侵入) │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ task/task-manager.ts - 任务管理器核心 │ │ │
│ │ │ task/persistence.ts - 持久化存储层 │ │ │
│ │ │ task/dependency-manager.ts - 依赖关系管理 │ │ │
│ │ │ task/task-graph-adapter.ts - 适配器桥接层 │ │ │
│ │ │ task/task-router.ts - 智能任务路由 │ │ │
│ │ │ tools/task.ts - 统一任务工具入口 │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
14.5.2 侵入性对比总结
| 方案 | 修改文件数 | 代码改动量 | 侵入等级 | 风险 |
|---|---|---|---|---|
| 直接修改核心逻辑 | 2 | 中-高 | 高 | 影响现有功能 |
| 独立新文件 + 最小引用 | 4(修改)+ 6(新增) | 低-中 | 低 | 风险隔离 |
| 完全独立模块 | 0(修改)+ 6(新增) | 低 | 零 | 需要额外集成 |
推荐方案:独立新文件 + 最小引用
14.5.3 关键设计原则
| 原则 | 实现方式 | 价值 |
|---|---|---|
| 开闭原则 | 对扩展开放(新增模块),对修改关闭(原文件不变) | 不影响现有功能 |
| 单一职责 | 每个新文件只负责一个功能 | 易于测试和维护 |
| 依赖倒置 | 原有代码依赖抽象接口 | 降低耦合度 |
| 懒加载 | TaskManager 延迟初始化 | 不影响启动性能 |
| 容错设计 | 任务图加载失败不影响会话 | 提高健壮性 |
十五、API 兼容性
15.1 SDK 集成示例
15.1.1 创建任务执行会话
import { createAgentSession, discoverAuthStorage } from "@oh-my-pi/coding-agent";
import type { TaskManager, Task } from "@oh-my-pi/coding-agent";
// 创建任务执行会话
const authStorage = await discoverAuthStorage();
const { session, taskManager } = await createAgentSession({
cwd: "/path/to/project",
authStorage,
toolNames: ["task", "todo_write"],
enableTaskGraph: true, // 启用持久化任务图
});
// 使用 TaskManager API
// 创建任务
const task: Task = await taskManager.create(
"Implement user authentication",
"Add JWT token-based authentication with refresh token support",
[1, 2] // blockedBy: 依赖任务 1 和 2
);
// 查询任务
const availableTasks = await taskManager.getAvailableTasks();
const blockedTasks = await taskManager.getBlockedTasks();
// 完成任务(自动解锁下游)
await taskManager.complete(task.id);
// 监听任务更新
taskManager.on("taskUpdated", (updatedTask: Task) => {
console.log(`Task ${updatedTask.id} status changed to ${updatedTask.status}`);
});
15.1.2 会话级任务 API
// 使用 todo_write 工具创建会话级任务
const todoResult = await session.prompt(`
我需要完成以下任务:
1. 设计数据库 schema
2. 实现 API 接口
3. 编写单元测试
`, {
attribution: "agent",
toolPreferences: ["todo_write"], // 强制使用会话级任务
});
// 使用 task_graph 创建持久化任务图
const taskGraphResult = await session.prompt(`
帮我规划这个功能开发:
第一步:设计 API(需要先完成需求文档)
第二步:实现核心逻辑(依赖第一步)
第三步:编写测试(依赖第二步)
第四步:部署上线(依赖第三步)
`, {
attribution: "agent",
toolPreferences: ["task_graph"], // 强制使用持久化任务图
});
15.1.3 任务图与会话集成
// 会话启动时自动加载任务图
const session = await createAgentSession({
cwd: "/path/to/project",
authStorage,
autoLoadTaskGraph: true, // 自动加载 .tasks/ 目录
autoMigrateSessionTasks: false, // 手动迁移会话任务(需通过命令触发)
});
// 访问任务图
const taskGraph = session.getTaskGraph();
// 使用 task:// URI 读取任务详情
const task = await session.readUri("task://1");
// 监听任务状态变化
session.on("taskStatusChange", (event) => {
console.log(`Task ${event.taskId}: ${event.oldStatus} -> ${event.newStatus}`);
});
15.1.4 子代理执行集成
import { runSubprocess, TaskExecutionOptions } from "@oh-my-pi/coding-agent";
// 执行任务图中的可用任务
const options: TaskExecutionOptions = {
agent: "code-assistant",
taskGraph: taskManager,
concurrency: 2,
isolationMode: "worktree",
onProgress: (progress) => {
console.log(`Task ${progress.taskId}: ${progress.status}`);
},
};
const results = await runSubprocess(options);
// 处理执行结果
for (const result of results) {
if (result.exitCode === 0) {
await taskManager.complete(result.taskId);
}
}
15.2 向后兼容性保证
| API 特性 | 兼容性 | 说明 |
|---|---|---|
todo_write 工具 |
✅ 完全兼容 | 原有工具保持不变 |
task 工具(子代理执行) |
✅ 完全兼容 | 原有功能保持不变 |
| 会话任务存储 | ✅ 完全兼容 | 迁移默认关闭,需手动触发 |
| 工具调用方式 | ✅ 完全兼容 | 新增工具不影响现有调用 |
15.3 API 版本控制
// 版本化 API
export const TaskGraphAPIVersion = "v1";
export interface TaskGraphAPI {
version: string;
create: (subject: string, description?: string, blockedBy?: number[]) => Task;
get: (taskId: number) => Task | null;
update: (taskId: number, updates: Partial<Task>) => void;
delete: (taskId: number) => void;
list: (filter?: TaskFilter) => Task[];
complete: (taskId: number) => void;
}
十六、配置迁移指南
16.1 分阶段发布策略(方案 B)
为保证功能的原子性和内敛性,迁移功能采用分阶段发布方案:
发布阶段 | 功能范围 | 迁移支持
-------------|-----------------------------------|---------
v1.0 | 核心任务图功能(持久化、依赖管理) | ❌ 无迁移
v1.1 | 添加迁移工具和命令 | ✅ 手动迁移
v1.2 | 完善迁移体验和验证 | ✅ 增强迁移
设计原则:
- 原子性:迁移功能完全独立于核心任务图功能
- 内敛性:迁移逻辑封装在独立模块,不污染核心接口
- 风险隔离:迁移失败不影响核心功能正常使用
- 渐进式采用:用户可选择迁移时机
16.2 迁移工作流
┌─────────────────────────────────────────────────────────────┐
│ 用户决定迁移 │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 执行命令: omp task migrate --dry-run │
│ 预览待迁移的会话任务 │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 用户确认后执行: omp task migrate │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 创建任务图任务(保留 phase 信息作为 description) │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 同步完成状态(可选) │
└───────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 显示迁移结果报告 │
└─────────────────────────────────────────────────────────────┘
16.3 迁移配置详解
export const MigrationConfigSchema = z.object({
/** 是否启用迁移功能(默认关闭,保持核心功能纯净) */
enabled: z.boolean().default(false),
/** 是否在迁移后保留原会话任务(默认保留,确保数据安全) */
keepOriginal: z.boolean().default(true),
/** 是否在迁移后同步完成状态 */
syncCompleted: z.boolean().default(true),
/** 迁移日志级别 */
logLevel: z.enum(["debug", "info", "warn", "error"]).default("info"),
});
配置说明:
enabled: false:默认关闭迁移功能,确保新环境零开销keepOriginal: true:默认保留原任务,防止数据丢失- 无自动迁移配置:迁移必须手动触发
16.4 手动迁移命令
# 查看待迁移的会话任务(预览模式,不执行迁移)
omp task migrate --dry-run
# 执行迁移(保留原任务)
omp task migrate
# 执行迁移并删除原任务(谨慎使用)
omp task migrate --remove-original
# 仅迁移已完成的任务
omp task migrate --only-completed
# 迁移指定会话的任务
omp task migrate --session <session-id>
# 查看迁移状态
omp task migrate --status
# 验证迁移结果(检查数据完整性)
omp task migrate --verify
16.5 迁移数据格式转换
| 会话任务字段 | 任务图字段 | 转换规则 |
|---|---|---|
content |
subject |
直接映射 |
notes |
description |
用换行符连接 |
status |
status |
四态 → 三态(abandoned → pending) |
phase.name |
description 前缀 |
添加到 description 开头 |
| 无 | blockedBy |
初始为空数组 |
| 无 | createdAt |
使用会话创建时间 |
| 无 | updatedAt |
使用当前时间 |
16.6 迁移模块架构
// 独立的迁移模块,不依赖核心功能启动
export class MigrationService {
constructor(
private taskManager: TaskManager,
private sessionManager: SessionManager
) {}
/**
* 检查是否有待迁移的会话任务
*/
async hasPendingMigration(): Promise<boolean> {
const todoData = await this.sessionManager.loadSessionTodo();
return todoData && todoData.phases.length > 0;
}
/**
* 预览迁移(不执行实际迁移)
*/
async preview(): Promise<MigrationPreview> {
const todoData = await this.sessionManager.loadSessionTodo();
if (!todoData) {
return { tasks: [], total: 0, warnings: [] };
}
return {
tasks: todoData.phases.flatMap(phase =>
phase.tasks.map(task => ({
content: task.content,
status: task.status,
phase: phase.name,
}))
),
total: todoData.phases.reduce((sum, p) => sum + p.tasks.length, 0),
warnings: [],
};
}
/**
* 执行迁移
*/
async migrate(options: MigrationOptions): Promise<MigrationResult> {
const todoData = await this.sessionManager.loadSessionTodo();
if (!todoData) {
return { success: true, migrated: 0, skipped: 0, errors: [] };
}
const migrated: number[] = [];
const errors: MigrationError[] = [];
for (const phase of todoData.phases) {
for (const item of phase.tasks) {
try {
const task = await this.taskManager.create(
item.content,
`${phase.name}: ${item.notes?.join("\n") || ""}`,
[]
);
migrated.push(task.id);
if (options.syncCompleted && item.status === "completed") {
await this.taskManager.complete(task.id);
}
} catch (err) {
errors.push({
content: item.content,
error: err instanceof Error ? err.message : String(err),
});
}
}
}
// 仅在成功且用户确认时删除原任务
if (options.removeOriginal && errors.length === 0) {
await this.sessionManager.clearSessionTodo();
}
return {
success: errors.length === 0,
migrated: migrated.length,
skipped: 0,
errors,
};
}
/**
* 验证迁移结果
*/
async verify(): Promise<MigrationVerification> {
// 对比源数据和目标数据的完整性
}
}
16.7 迁移验证
// 迁移验证接口
export interface MigrationValidator {
validateSource: (sessionTasks: TodoPhase[]) => MigrationValidationResult;
validateTarget: (taskGraph: TaskGraph) => MigrationValidationResult;
compare: (source: TodoPhase[], target: Task[]) => MigrationComparison;
}
// 验证结果
export interface MigrationValidationResult {
valid: boolean;
errors: string[];
warnings: string[];
stats: {
totalTasks: number;
migratedTasks: number;
skippedTasks: number;
failedTasks: number;
};
}
// 迁移预览结果
export interface MigrationPreview {
tasks: Array<{ content: string; status: TodoStatus; phase: string }>;
total: number;
warnings: string[];
}
// 迁移执行结果
export interface MigrationResult {
success: boolean;
migrated: number;
skipped: number;
errors: MigrationError[];
}
export interface MigrationError {
content: string;
error: string;
}
16.8 方案 B 的优势总结
| 维度 | 方案 B(分阶段发布) | 说明 |
|---|---|---|
| 原子性 | ✅ 优秀 | 迁移功能完全独立,可单独测试和发布 |
| 内敛性 | ✅ 优秀 | 核心模块保持纯净,迁移逻辑封装在独立模块 |
| 风险控制 | ✅ 优秀 | 迁移失败不影响核心功能启动 |
| 新环境开销 | ✅ 零开销 | 默认关闭迁移,新环境无检测开销 |
| 用户体验 | ✅ 可控 | 用户可选择迁移时机,支持预览 |
| 代码复杂度 | ✅ 较低 | 模块职责清晰,耦合度低 |
十七、监控指标
17.1 核心指标
| 指标名称 | 类型 | 说明 | 单位 |
|---|---|---|---|
task.graph.tasks.total |
Gauge | 任务图中任务总数 | 个 |
task.graph.tasks.pending |
Gauge | 待执行任务数 | 个 |
task.graph.tasks.in_progress |
Gauge | 执行中任务数 | 个 |
task.graph.tasks.completed |
Gauge | 已完成任务数 | 个 |
task.graph.tasks.blocked |
Gauge | 被阻塞任务数 | 个 |
task.graph.dependencies.total |
Gauge | 依赖关系总数 | 条 |
task.graph.dependencies.cycles |
Gauge | 循环依赖数 | 个 |
17.2 执行指标
| 指标名称 | 类型 | 说明 | 单位 |
|---|---|---|---|
task.execution.count |
Counter | 任务执行总次数 | 次 |
task.execution.success |
Counter | 成功执行次数 | 次 |
task.execution.failure |
Counter | 失败执行次数 | 次 |
task.execution.duration |
Histogram | 执行耗时分布 | ms |
task.execution.concurrency |
Gauge | 当前并发执行数 | 个 |
17.3 持久化指标
| 指标名称 | 类型 | 说明 | 单位 |
|---|---|---|---|
task.persistence.read.count |
Counter | 文件读取次数 | 次 |
task.persistence.write.count |
Counter | 文件写入次数 | 次 |
task.persistence.lock.wait |
Histogram | 锁等待时间 | ms |
task.persistence.lock.timeout |
Counter | 锁超时次数 | 次 |
task.persistence.error.count |
Counter | 持久化错误次数 | 次 |
17.4 监控集成示例
import { EventBus } from "@oh-my-pi/pi-utils";
import { taskGraphMetrics } from "./metrics";
// 订阅任务更新事件
EventBus.subscribe("task:updated", (event) => {
taskGraphMetrics.tasks.total.set(event.totalTasks);
taskGraphMetrics.tasks.pending.set(event.pendingTasks);
taskGraphMetrics.tasks.in_progress.set(event.inProgressTasks);
taskGraphMetrics.tasks.completed.set(event.completedTasks);
});
// 订阅执行事件
EventBus.subscribe("task:execution:completed", (event) => {
taskGraphMetrics.execution.count.inc();
if (event.success) {
taskGraphMetrics.execution.success.inc();
} else {
taskGraphMetrics.execution.failure.inc();
}
taskGraphMetrics.execution.duration.observe(event.durationMs);
});
// 暴露指标(Prometheus 格式)
export function getMetrics(): string {
return [
`# HELP task_graph_tasks_total Total number of tasks`,
`# TYPE task_graph_tasks_total gauge`,
`task_graph_tasks_total ${taskGraphMetrics.tasks.total.value}`,
// ... 其他指标
].join("\n");
}
17.5 告警规则
# 任务阻塞告警
groups:
- name: task-graph-alerts
rules:
- alert: TaskGraphHighBlockedRatio
expr: task_graph_tasks_blocked / task_graph_tasks_total > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "任务图阻塞率超过 50%"
description: "当前阻塞任务数: {{ $value }}"
- alert: TaskGraphCycleDetected
expr: task_graph_dependencies_cycles > 0
for: 1m
labels:
severity: critical
annotations:
summary: "检测到循环依赖"
description: "循环依赖数: {{ $value }}"
- alert: TaskExecutionFailureRate
expr: task_execution_failure / task_execution_count > 0.1
for: 10m
labels:
severity: warning
annotations:
summary: "任务执行失败率超过 10%"
description: "失败次数: {{ $value }}"
十八、性能基准
18.1 性能测试配置
| 测试参数 | 值 |
|---|---|
| CPU | Intel i7-12700K (12 cores) |
| 内存 | 32GB DDR4 |
| 存储 | NVMe SSD |
| Node.js | 20.x |
| 测试任务数 | 100, 1000, 10000 |
18.2 性能测试结果
18.2.1 任务加载性能
| 任务数量 | 加载时间 | 内存占用 |
|---|---|---|
| 100 | < 10ms | ~500KB |
| 1000 | < 50ms | ~5MB |
| 10000 | < 200ms | ~50MB |
18.2.2 依赖图操作性能
| 操作 | 1000 任务 | 10000 任务 |
|---|---|---|
| 拓扑排序 | < 10ms | < 50ms |
| 依赖检测 | < 5ms | < 30ms |
| 自动解锁 | < 3ms | < 20ms |
18.2.3 持久化操作性能
| 操作 | 单任务 | 批量(100任务) |
|---|---|---|
| 写入 | < 5ms | < 50ms |
| 读取 | < 2ms | < 20ms |
| 删除 | < 3ms | < 30ms |
18.2.4 并发性能
| 并发数 | 吞吐量 | 平均延迟 |
|---|---|---|
| 1 | 100 ops/s | 10ms |
| 4 | 350 ops/s | 11ms |
| 8 | 500 ops/s | 16ms |
| 16 | 600 ops/s | 27ms |
18.3 性能优化策略
18.3.1 增量加载
// 按需加载任务详情
async function loadTaskSummary(taskId: number): Promise<TaskSummary> {
// 只加载基本信息,延迟加载详情
const summary = await this.cache.get(taskId);
if (summary) return summary;
const fullTask = await this.loadFullTask(taskId);
const taskSummary: TaskSummary = {
id: fullTask.id,
subject: fullTask.subject,
status: fullTask.status,
blockedBy: fullTask.blockedBy.length,
};
await this.cache.set(taskId, taskSummary, { ttl: 60000 });
return taskSummary;
}
18.3.2 任务归档
// 自动归档已完成任务
async function archiveCompletedTasks(maxAgeDays: number): Promise<number> {
const cutoff = Date.now() - maxAgeDays * 24 * 60 * 60 * 1000;
const completedTasks = await this.list({ status: "completed" });
let archivedCount = 0;
for (const task of completedTasks) {
if (task.completedAt && task.completedAt < cutoff) {
await this.archiveTask(task.id);
await this.delete(task.id);
archivedCount++;
}
}
return archivedCount;
}
18.3.3 缓存机制
// 依赖图计算结果缓存
class DependencyGraphCache {
#cache = new Map<string, DependencyGraph>();
#ttl = 30000; // 30秒
get(key: string): DependencyGraph | undefined {
const entry = this.#cache.get(key);
if (entry && Date.now() - entry.timestamp < this.#ttl) {
return entry.graph;
}
this.#cache.delete(key);
return undefined;
}
set(key: string, graph: DependencyGraph): void {
this.#cache.set(key, { graph, timestamp: Date.now() });
}
invalidate(pattern: string): void {
for (const key of this.#cache.keys()) {
if (key.startsWith(pattern)) {
this.#cache.delete(key);
}
}
}
}
18.4 性能监控仪表板
// 性能监控面板
export interface PerformanceDashboard {
tasks: {
total: number;
byStatus: Record<TaskStatus, number>;
};
dependencies: {
total: number;
cycles: number;
};
performance: {
loadTimeMs: number;
graphBuildTimeMs: number;
writeLatencyMs: number;
readLatencyMs: number;
};
concurrency: {
current: number;
peak: number;
limit: number;
};
}
文档最后更新日期:2026-05-21 补充了 API 兼容性、配置迁移指南、监控指标和性能基准等章节
附录:相关文件路径
oh-my-pi 当前任务系统文件
| 文件 | 路径 | 说明 |
|---|---|---|
| todo-write 工具 | packages/coding-agent/src/tools/todo-write.ts |
会话级任务列表管理,支持 Markdown 双向转换 |
| task 工具主文件 | packages/coding-agent/src/task/index.ts |
子代理并行执行框架,包含计划模式集成和模型覆盖机制 |
| task 工具类型 | packages/coding-agent/src/task/types.ts |
Zod v4 类型定义,包含 AgentProgress、SingleResult 等核心类型 |
| task 工具执行器 | packages/coding-agent/src/task/executor.ts |
子代理进程内执行引擎,负责事件转发和进度追踪 |
| task 工具渲染器 | packages/coding-agent/src/task/render.ts |
TUI 渲染组件,支持进度、结果和审查发现的可视化 |
| task 工具 prompt | packages/coding-agent/src/prompts/tools/task.md |
工具描述文档 |
s07 持久化任务图新增文件(计划)
| 文件 | 路径 | 说明 |
|---|---|---|
| 任务图适配器 | packages/coding-agent/src/task/task-graph-adapter.ts |
桥接新旧任务系统,支持数据转换和同步 |
| 智能任务路由器 | packages/coding-agent/src/task/task-router.ts |
根据任务复杂度自动选择工具 |
| 统一任务工具 | packages/coding-agent/src/tools/task.ts |
统一的任务创建入口,支持自动路由 |
| 任务持久化层 | packages/coding-agent/src/task/persistence.ts |
负责读写 .tasks/task_{id}.json 文件 |
| 依赖管理器 | packages/coding-agent/src/task/dependency-manager.ts |
管理任务依赖关系和自动解锁逻辑 |
learn-claude-code 参考文件
| 文件 | 路径 |
|---|---|
| s07 实现 | agents/s07_task_system.py |
| s07 文档 (中文) | docs/zh/s07-task-system.md |
| s03 实现 | agents/s03_todo_write.py |
| s04 实现 | agents/s04_subagent.py |
oh-my-pi 测试文件
| 文件 | 路径 | 说明 |
|---|---|---|
| todo-write 测试 | packages/coding-agent/test/tools/todo-write.test.ts |
todo-write 工具单元测试 |
| executor 警告测试 | packages/coding-agent/test/task/executor-warnings.test.ts |
执行器警告处理测试 |
| executor 提醒测试 | packages/coding-agent/test/task/executor-subagent-reminders.test.ts |
子代理提醒功能测试 |
| worktree 测试 | packages/coding-agent/test/task/worktree.test.ts |
worktree 隔离模式测试 |
| 渲染测试 | packages/coding-agent/test/task/render-report-finding.test.ts |
报告发现渲染测试 |
| Windows ARM projfs 测试 | packages/coding-agent/test/task/issue-949-windows-arm-projfs.test.ts |
Windows ARM projfs 兼容性测试 |
文档生成日期:2026-05-20