ContextEngine Index 模块详细设计(index.md)
目标:基于
ce_architecture.md的 7/10.6/11.2,给出可直接生成代码的索引模块实现设计。
对齐原则:保持 OpenViking 的实现风格(稳定 ID、分层语义、异步队列/worker、可恢复最终一致),但按 ContextEngine 的对象与接口契约落地。
1. 模块职责与边界
1.1 职责
index 模块负责把 ContextFS 的主存节点异步投影到向量索引副本,保证:
- 主存成功即 commit 成功(不阻塞写链路)。
- 索引通过 Outbox 最终一致。
- 事件可重放、可重试、可修复(repair)。
- 事件处理幂等,可横向扩展多个 worker。
1.2 边界
index读取:ContextFS:节点三层文本与 metadataRelationStore:关系边(用于UPSERT_RELATION)OutboxStore:待处理事件
index写入:VectorIndex:向量副本 upsert/delete/moveOutboxStore:事件状态变更(done/failed/retry)
index不负责:- 主存事务提交(
commit模块) - 查询规划/检索编排(
retrieval模块)
- 主存事务提交(
1.3 模块概述(Module Overview)
- 模块名称(唯一):
ce.index - 一句话职责:消费 Outbox 事件,把
ContextFS主存异步投影为可检索的向量索引副本,并保证最终一致。 - 外部依赖(建议实现):
- 存储:
ContextFS、RelationStore、Outbox 持久层(SQL/KV) - 模型:
Embedder(批量文本向量化) - 向量库:
VectorIndex适配器 - 基础库:
hashlib/md5、datetime、json、并发原语(线程池/async)
- 与其他模块接口依赖:
commit -> outbox.append_batch(...)index worker -> fs/relation/vector/embedderretrieval -> vector.search(...)
1.4 输入/输出(I/O)
- 输入(核心):
OutboxEvent[](pending或过期claimed)ContextNode(来自ContextFS.read_node)RelationEdge[](来自RelationStore.get_edges)
- 输入格式:
- 事件与对象均为结构化对象;跨进程传输时使用 JSON。
- 输出(核心):
- 向量索引写入副作用:
upsert/delete/move - 事件状态更新:
done/retry_wait/dead - 批次结果:
IndexBatchResult{claimed,succeeded,failed,retried}
- 输出格式:
- 接口返回对象(内存)
- 日志与监控指标(结构化文本 + metrics)
1.5 设计目标与约束(量化)
以下为 v1 推荐目标值,可按部署规模调整。
- 性能目标:
run_once(limit=100):p95 < 2s(不含外部 embedding/向量库长尾超时)- 单 worker 吞吐:>= 50 events/s(中等文本、批量 embedding 开启)
claim_batch:p95 < 50ms
- 资源约束:
- 单 worker 内存预算:<= 512MB
- 批次内文本总长度上限:默认 1MB(超过拆批)
- CPU 预算:默认 1~2 核可稳定运行
- 一致性约束:
- 主存成功即提交成功
- 索引副本允许短暂落后,但可重试收敛到最终一致
- 所有写操作必须幂等
1.6 架构图
1.6.1 Index 模块在整体架构中的位置
flowchart LR
A[commit 模块\nCommitCoordinator / ContextWriter / RelationWriter]
B[OutboxStore\nappend / append_batch]
C[index 模块\nIndexWorker / IndexProjector / RecordBuilder]
D[ContextFS\nread_node / read_level / stat_node]
E[RelationStore\nget_edges]
F[Embedder\nembed_texts]
G[VectorIndex\nupsert / delete_by_uri / move_uri]
H[retrieval 模块\nL0Retriever / HierarchicalSearcher]
A -->|登记 OutboxEvent| B
B -->|claim_batch| C
C -->|读取主存节点| D
C -->|读取关系边| E
C -->|文本向量化| F
C -->|写入索引副本| G
H -->|search(level=0/1/2)| G
1.6.2 Index 模块内部组件关系
flowchart TB
subgraph IndexModule[index 模块]
W[IndexWorker]
O[OutboxStore]
P[IndexProjector]
R[RecordBuilder]
J[RepairJob]
EC[ErrorClassifier]
end
FS[ContextFS]
RS[RelationStore]
EM[Embedder]
VI[VectorIndex]
W -->|claim / mark_done / mark_failed| O
W -->|读取节点| FS
W -->|读取关系| RS
W -->|构建投影| P
P -->|构建 L0/L1/L2 记录| R
W -->|错误分类| EC
W -->|embed_texts| EM
W -->|upsert / delete / move| VI
J -->|requeue / reconcile| O
J -->|扫描主存| FS
J -->|校验副本| VI
1.6.3 ContextNode 到 IndexRecord 的投影关系
flowchart LR
N[ContextNode\nuri / parent_uri / context_type / category\nabstract / overview / content / metadata]
A0[".abstract.md"]
A1[".overview.md"]
A2["content.md"]
R0[IndexRecord L0\nid=stable_hash(uri,0)]
R1[IndexRecord L1\nid=stable_hash(uri,1)]
R2[IndexRecord L2\nid=stable_hash(uri,2)]
N --> A0
N --> A1
N --> A2
A0 --> R0
A1 --> R1
A2 --> R2
1.6.4 Outbox 到向量副本的处理主线
sequenceDiagram
participant Commit as commit 模块
participant Outbox as OutboxStore
participant Worker as IndexWorker
participant FS as ContextFS/RelationStore
participant Projector as IndexProjector
participant Embedder as Embedder
participant Vector as VectorIndex
Commit->>Outbox: append_batch(events)
Worker->>Outbox: claim_batch(worker_id, limit)
Outbox-->>Worker: OutboxEvent[]
Worker->>Worker: handle_event(event, ctx)
Worker->>FS: read_node(uri) / get_edges(uri)
FS-->>Worker: ContextNode / RelationEdge[]
Worker->>Projector: build_context_records(...) / build_relation_records(...)
Projector-->>Worker: IndexRecord[]
Worker->>Embedder: embed_texts([record.text...])
Embedder-->>Worker: EmbeddingVector[]
Worker->>Vector: upsert / delete_by_uri / move_uri
Worker->>Outbox: mark_done(event_id)
1.6.5 MOVE_CONTEXT 的迁移语义图
flowchart TD
A[旧 URI 子树\nctx://.../from] --> B[扫描命中的索引记录]
B --> C[替换 uri 前缀\nfrom -> to]
B --> D[替换 parent_uri 前缀\nold_parent -> new_parent]
C --> E[按 new_uri + level 重新计算 id]
D --> E
E --> F[批量 upsert 新记录]
F --> G[删除旧记录]
G --> H[mark_done]
2. 核心对象与数据结构
2.1 IndexRecord
@dataclass
class IndexRecord:
id: str
uri: str
parent_uri: str
context_type: str
category: str
level: int # 0(L0), 1(L1), 2(L2)
text: str # 该层用于 embedding 的原文
filters: dict[str, Any] # 可过滤字段(扁平标量)
metadata: dict[str, Any]
2.1.1 IndexRecord.id = stable_hash(uri, level) 规则(强制)
规范输入(必须参与)
canonical_uri:规范化后的节点 URI(不带层级后缀文件名)。level:0|1|2整数。id_schema_version:默认固定"v1"(用于未来哈希策略升级)。
不参与项(必须明确)
node_version/updated_at/event_id不参与(否则更新会产生新 ID,破坏 upsert 幂等)。text不参与(同 URI 同 level 的内容更新应覆盖同一记录)。parent_uri/context_type/category不参与(属性变更不应变更主键)。
哈希输入串
seed = "{id_schema_version}|{canonical_uri}|{level}"
id = md5_hex(seed) # 32位小写十六进制
说明:采用 MD5 是“确定性键”用途,不用于安全签名;风格与 OpenViking 当前实现保持一致。
URI 规范化(canonical_uri)
- 去前后空白。
- scheme 小写(
ctx://)。 - 移除末尾
/(根 URI 除外)。 - 不拼接
.abstract.md/.overview.md/content.md到uri字段;层级通过level表示。 - 路径大小写保持原样(避免跨平台语义漂移)。
2.2 L0/L1/L2 文本构建规则(IndexRecord.text)
节点目录结构:
{node_uri}/
├── .abstract.md
├── .overview.md
├── content.md
├── .meta.json
└── .relations.json
2.2.1 构建规则
- L0:读取
.abstract.md(缺失则降级到.overview.md前 N 字)。 - L1:读取
.overview.md(缺失则降级到content.md摘要截断)。 - L2:读取
content.md(缺失则空串;空串不写入 L2 记录)。
2.2.2 最小字段集合(filters / metadata)
filters 最小集合(必须可过滤):
{
"uri": "ctx://...",
"parent_uri": "ctx://...",
"context_type": "memory|resource|skill|session",
"category": "profile|preferences|entities|events|cases|patterns|skills|...",
"level": 0,
"owner_space": "user:xxx|agent:xxx|shared",
"account_id": "acc_xxx"
}
metadata 最小集合(建议):
{
"node_version": 12,
"updated_at": "2026-03-14T10:00:00Z",
"content_hash": "sha256:...",
"source_files": [".abstract.md", ".overview.md", "content.md"],
"tags": ["..."],
"trace_id": "..."
}
2.3 OutboxEvent
@dataclass
class OutboxEvent:
event_id: str
event_type: str # UPSERT_CONTEXT/DELETE_CONTEXT/MOVE_CONTEXT/UPSERT_RELATION
uri: str
payload: dict[str, Any]
status: str
retry_count: int
created_at: datetime
2.4 MoveProjection
@dataclass
class MoveProjection:
old_uri: str
new_uri: str
old_parent_uri: str
new_parent_uri: str
2.4.1 实现层附加字段说明
为保持和 ce_architecture.md 的核心对象模型一致,OutboxEvent 与 MoveProjection 在公开契约中只保留最小必需字段。
如实现需要以下附加字段,应视为 outbox_store / index_worker 的内部持久化字段,而不是核心对象模型必选字段:
max_retriesnext_visible_atclaimed_byclaim_tokenclaimed_untillast_errorupdated_atrecursiveaffected_prefix
2.5 字段约束(Data Constraints)
IndexRecord.level:仅允许0|1|2。IndexRecord.id:32位小写十六进制字符串(MD5)。OutboxEvent.event_type:UPSERT_CONTEXT|DELETE_CONTEXT|MOVE_CONTEXT|UPSERT_RELATION。OutboxEvent.status:pending|claimed|retry_wait|done|failed|dead。retry_count:>=0;若实现层启用max_retries,则应满足retry_count <= max_retries。uri/parent_uri:必须是ctx://逻辑 URI;禁止空白字符串。owner_space:user:*|agent:*|shared(部署可扩展,但必须可过滤)。
2.6 示例数据(可直接用于测试)
2.6.1 IndexRecord 示例
{
"id": "d6f4749f66f26ea98f15f6f5f181669c",
"uri": "ctx://acc1/users/u1/memories/preferences/theme",
"parent_uri": "ctx://acc1/users/u1/memories/preferences",
"context_type": "memory",
"category": "preferences",
"level": 0,
"text": "用户偏好深色主题,并倾向简洁界面。",
"filters": {
"uri": "ctx://acc1/users/u1/memories/preferences/theme",
"parent_uri": "ctx://acc1/users/u1/memories/preferences",
"context_type": "memory",
"category": "preferences",
"level": 0,
"owner_space": "user:u1",
"account_id": "acc1"
},
"metadata": {
"node_version": 12,
"updated_at": "2026-03-14T10:00:00Z",
"content_hash": "sha256:abc",
"source_files": [".abstract.md"],
"trace_id": "t-001"
}
}
2.6.2 OutboxEvent 示例
{
"event_id": "evt-001",
"event_type": "UPSERT_CONTEXT",
"uri": "ctx://acc1/users/u1/memories/preferences/theme",
"payload": {
"event_version": 1,
"uri": "ctx://acc1/users/u1/memories/preferences/theme",
"expected_node_version": 12,
"reason": "merge",
"trace_id": "t-001"
},
"status": "pending",
"retry_count": 0,
"created_at": "2026-03-14T10:00:00Z"
}
3. Outbox 设计
3.1 事件类型与 payload
3.1.1 UPSERT_CONTEXT
{
"event_version": 1,
"uri": "ctx://a/users/u/memories/preferences/theme",
"expected_node_version": 12,
"reason": "create|merge|append|repair",
"trace_id": "..."
}
3.1.2 DELETE_CONTEXT
{
"event_version": 1,
"uri": "ctx://a/users/u/memories/events/e123",
"expected_deleted_version": 8,
"recursive": false,
"trace_id": "..."
}
3.1.3 MOVE_CONTEXT
{
"event_version": 1,
"from_uri": "ctx://a/users/u/memories/entities/project-x",
"to_uri": "ctx://a/users/u/memories/entities/project-y",
"recursive": true,
"old_parent_uri": "ctx://a/users/u/memories/entities",
"new_parent_uri": "ctx://a/users/u/memories/entities",
"trace_id": "..."
}
3.1.4 UPSERT_RELATION
{
"event_version": 1,
"uri": "ctx://a/users/u/memories/cases/c1",
"relation_version": 5,
"mode": "replace",
"trace_id": "..."
}
3.2 状态机
pending -> claimed -> done
|-> retry_wait -> pending
|-> failed(不可重试) -> dead(超过最大重试或人工打标)
状态定义:
pending:待消费。claimed:已被某 worker 抢占,租约有效期内。retry_wait:失败后等待重试窗口。done:处理完成(终态)。failed:本次处理失败(中间态,通常马上转retry_wait或dead)。dead:超过max_retries或判定不可重试(终态,等待 repair)。
3.3 重试与死信策略
- 默认
max_retries=8。 - 退避:
min(base * 2^retry_count, max_backoff) + jitter。 - 错误分类:
- 可重试:网络超时、限流、向量库暂不可用、租约冲突。
- 不可重试:payload 缺字段、URI 非法、层级非法。
- 达到
max_retries->dead,由 repair job 扫描恢复。
3.4 OutboxStore 并发安全语义(关键)
3.4.1 claim_batch(worker_id, limit)
语义要求:
- 同一事件在任意时刻最多被一个 worker claim。
- claim 需原子写入:
status=claimed, claimed_by, claim_token, claimed_until。 - 可回收过期租约:
status=claimed && claimed_until < now的事件可再次 claim。
推荐实现:
- SQL 存储:
SELECT ... FOR UPDATE SKIP LOCKED+ 批量UPDATE ... WHERE id IN (...)。 - KV 存储:CAS(compare-and-set)更新版本号。
3.4.2 mark_done(event_id)
语义要求:
- 仅当前持有有效 claim 的 worker 可以 done。
- 幂等:重复调用不报错(已 done 返回成功)。
- 过期 claim 不应覆盖新 claim 的状态。
- 若实现需要
claim_token/claimed_by校验,应作为OutboxStore内部实现细节,不改变10.6的公开接口签名。
3.4.3 mark_failed(event_id, error, retryable=True)
语义要求:
- 仅当前持有有效 claim 的 worker 可写失败结果。
retryable=True且未超重试:转retry_wait并设置next_visible_at。retryable=False或超重试:转dead。- 记录
last_error/error_code/retry_count用于观测和 repair。
3.4.4 租约/超时回收(可选但建议默认开启)
lease_seconds默认 60s。- worker 处理长任务可在实现层心跳续租。
- 无心跳且超时 -> 自动可回收,避免永久卡死。
4. IndexProjector 设计
4.1 职责
将主存对象(ContextNode / RelationEdge[])投影为索引对象(IndexRecord[] / RelationIndexRecord[] / MoveProjection)。
4.2 build_context_records(node) 规则
- 输入:完整
ContextNode(含abstract/overview/content/metadata)。 - 输出:最多 3 条记录(L0/L1/L2)。
- 跳过规则:
- 对应层文本为空则跳过该层。
- 但至少应保留一层(通常 L0)。
- 每条记录使用同一
uri/parent_uri/context_type/category/owner_space/account_id,仅level/text/id不同。
伪代码:
def build_context_records(node: ContextNode) -> list[IndexRecord]:
texts = {
0: pick_l0(node.abstract, node.overview, node.content),
1: pick_l1(node.overview, node.content),
2: pick_l2(node.content),
}
records = []
for level, text in texts.items():
if not text:
continue
uri = canonicalize_uri(node.uri)
records.append(IndexRecord(
id=stable_hash(uri, level),
uri=uri,
parent_uri=canonicalize_uri(node.parent_uri or ""),
context_type=node.context_type,
category=node.category or "",
level=level,
text=text,
filters=build_filters(node, level),
metadata=build_metadata(node, level, text),
))
return records
4.3 build_relation_records(uri, edges) 规则
- relation 索引默认可选(
enable_relation_index=false时返回空)。 - 每条关系边构造稳定 ID:
stable_hash(f"{from_uri}|{to_uri}|{relation_type}", 0)。 - 关系记录至少保留:
from_uri/to_uri/relation_type/weight/reason/account_id/owner_space。
4.4 build_move_records(from_uri, to_uri) 规则
返回 MoveProjection,用于 VectorIndex.move_uri:
old_uri/new_uri:移动根节点 URI。old_parent_uri/new_parent_uri:根节点父 URI。- 子树迁移是否递归处理属于
IndexWorker/VectorIndex的实现策略,不属于MoveProjection核心字段。
5. IndexWorker 设计
5.1 run_once(limit=100) 完整流程(强制)
claim -> 按事件分发 -> 读取 ContextFS/RelationStore
-> IndexProjector -> Embedder.embed_texts -> VectorIndex 写入 -> mark_done
5.1.1 详细步骤
events = outbox.claim_batch(worker_id, limit)。- 遍历事件:
handle_event(event, ctx)。- 成功 ->
mark_done(event_id)。 - 失败 ->
mark_failed(..., retryable=classify(err))。
- 汇总
IndexBatchResult(claimed, succeeded, failed, retried)。
5.2 事件分发逻辑
5.2.1 UPSERT_CONTEXT
node = ContextFS.read_node(ctx, uri)。- 若节点不存在:降级执行
VectorIndex.delete_by_uri(uri)(防止脏残留)。 records = IndexProjector.build_context_records(node)。vectors = Embedder.embed_texts([r.text])批量化。VectorIndex.upsert(records_with_vectors)。
5.2.2 DELETE_CONTEXT
- 可选防误删:先
ContextFS.exists(uri)。 - 若仍存在且版本更新,判定为陈旧删除事件,直接
mark_done(不删索引)。 - 否则
VectorIndex.delete_by_uri(uri)(含各层记录)。
5.2.3 MOVE_CONTEXT
projection = IndexProjector.build_move_records(from_uri, to_uri)。VectorIndex.move_uri(from_uri, to_uri)。- 若向量库不支持原地 move:
scan old -> rekey upsert new -> delete old。
5.2.4 UPSERT_RELATION
edges = RelationStore.get_edges(ctx, uri)。records = IndexProjector.build_relation_records(uri, edges)。- 通过可选的关系副本 provider 同步
records;若未开启关系副本则直接跳过。
5.3 每步失败时的最终一致保证
- claim 后进程崩溃:租约过期后被重新 claim。
- embed 失败:事件重试,主存不受影响。
- vector upsert 成功但 mark_done 失败:事件会重试;由于
id=stable_hash(uri, level),重复 upsert 为覆盖写,幂等成立。 - delete 重试:重复 delete 为 no-op,幂等。
- move 重试:
move_uri需设计为幂等(源不存在且目标已存在视为成功)。
5.4 核心伪代码(含分支与循环)
def run_once(limit: int = 100) -> IndexBatchResult:
result = IndexBatchResult(claimed=0, succeeded=0, failed=0, retried=0)
events = outbox.claim_batch(worker_id=self.worker_id, limit=limit)
result.claimed = len(events)
for event in events: # 循环处理批次事件
try:
handle_event(event, ctx)
outbox.mark_done(event.event_id)
result.succeeded += 1
except RetryableIndexError as e:
outbox.mark_failed(
event.event_id,
error=str(e),
retryable=True,
)
result.retried += 1
except NonRetryableIndexError as e:
outbox.mark_failed(
event.event_id,
error=str(e),
retryable=False,
)
result.failed += 1
except Exception as e:
# 未知错误默认按可重试处理,避免消息丢失
outbox.mark_failed(
event.event_id,
error=f"unknown:{e}",
retryable=True,
)
result.retried += 1
return result
5.5 算法细节与边界情况
- 条件分支:
UPSERT_CONTEXT:节点不存在时降级为delete_by_uri。DELETE_CONTEXT:若主存已出现更高版本节点,判定为陈旧删除事件并忽略删除。MOVE_CONTEXT:若from_uri == to_uri,直接幂等成功。
- 循环逻辑:
- 批次事件按 claim 顺序遍历。
- embedding 与 upsert 可在事件内做子批处理循环(按
batch_size分片)。
- 边界情况:
- 空文本:对应层记录跳过,不调用 embedding。
- 超长文本:按 token/字节分段或截断(配置驱动)。
- 非法 level:直接抛
NonRetryableIndexError。
- 模块内部状态管理:
- 业务状态以 Outbox 状态机为准(无本地内存状态机依赖)。
- worker 仅持有短期运行态:
worker_id、inflight_count、心跳续租计时器。 - 缓存策略(可选):
uri -> node_stat/version的短 TTL 缓存,用于减少重复读。
6. VectorIndex 适配设计
6.1 必选接口(对齐 10.6)
class VectorIndex(Protocol):
def upsert(self, records: list[IndexRecordWithVector]) -> None: ...
def delete_by_uri(self, uri: str) -> None: ...
def move_uri(self, from_uri: str, to_uri: str) -> None: ...
def search(
self,
query_vector: list[float],
filters: dict[str, Any],
top_k: int,
level: int = 0,
) -> list[VectorHit]: ...
6.2 可选扩展接口
class VectorIndex(Protocol):
def upsert_relations(self, records: list[RelationIndexRecord]) -> None: ...
def delete_relations_by_uri(self, uri: str) -> int: ...
6.3 move_uri(from_uri, to_uri) 语义(强制)
6.3.1 语义目标
- 更新所有
uri以from_uri前缀开头的记录为to_uri前缀。 - 同步更新
parent_uri:
- 根节点:
parent_uri = parent(to_uri)。 - 子节点:
parent_uri也做前缀替换。
- 重新计算每条记录 ID:
new_id = stable_hash(new_uri, level)。
6.3.2 实现策略
- 扫描命中集合(按前缀过滤)。
- 对每条记录生成新记录(URI/parent_uri/filters/id 更新,向量保持不变)。
- 批量 upsert 新记录。
- 批量删除旧 ID(或旧 URI)。
- 全流程可重入:若已迁移过,再执行应视为成功且不抛异常。
7. 幂等与一致性
7.1 幂等不变量
UPSERT_CONTEXT:同uri+level永远映射同id。DELETE_CONTEXT:可重复执行,删除不存在记录不报错。MOVE_CONTEXT:重复执行不产生额外副作用。mark_done/mark_failed:重复调用不应破坏最终状态;若实现层启用租约校验,应防止过期 worker 误写状态。
7.2 一致性策略(最终一致)
- 主存是 source of truth。
- 索引是可重建副本。
- worker 处理时尽量读“当前主存状态”,天然吸收乱序事件影响。
7.2.1 乱序场景处理
DELETE先于旧UPSERT到达:UPSERT时发现节点不存在,执行 delete 收敛。UPSERT先于MOVE到达:MOVE会重写 URI 映射,最终收敛。MOVE后又来旧UPSERT(from_uri):读取主存发现from_uri不存在,转 delete 或忽略,最终收敛到to_uri。
8. 失败重试与 repair
8.1 错误分类
RetryableError:超时、限流、临时网络故障、租约冲突、向量库 5xx。NonRetryableError:参数校验失败、URI 非法、事件类型不支持、数据损坏。
8.2 重试参数
index:
outbox:
max_retries: 8
retry_base_seconds: 2
retry_max_seconds: 300
retry_jitter_ratio: 0.2
8.3 dead-letter 与 repair job
8.3.1 Dead-letter
事件进入 dead 条件:
retry_count >= max_retries。retryable=False的不可恢复错误。
8.3.2 Repair Job(建议实现两类)
repair_dead_events:
- 扫描
status=dead事件。 - 支持
requeue(重置为pending)和force_done(人工确认)。
repair_reconcile_index:
- 扫描
ContextFS全量或增量 URI。 - 对比索引缺失/多余记录,自动补发
UPSERT_CONTEXT/DELETE_CONTEXT事件。
8.4 错误处理矩阵(Error Handling)
| 错误类型 | 典型触发条件 | 处理策略 |
|---|---|---|
EmbedTimeoutError |
embedding 请求超时 | mark_failed(retryable=True),指数退避 |
EmbedRateLimitError |
embedding 限流 | retryable=True,并触发降速/降并发 |
VectorIndexUnavailableError |
向量库不可用/5xx | retryable=True |
VectorSchemaMismatchError |
字段缺失或类型不匹配 | retryable=False,进入 dead |
InvalidUriError |
URI 非法或不可解析 | retryable=False |
OutboxClaimConflictError |
claim_token 冲突或租约失效 | 记录告警,当前事件按失败重试 |
ContextNotFoundError |
UPSERT 时主存节点缺失 | 降级 delete_by_uri 后 done |
UnknownError |
未分类异常 | 默认 retryable=True,防止消息丢失 |
9. 并发模型与性能
9.1 并发模型
- 多 worker 并行,Outbox claim 去重。
- 单 worker 内可批处理 embedding 与 upsert。
- 避免同事件并发:依赖 claim + lease + token。
9.2 性能建议
claim_batch默认 100(可配置)。embed_batch_size默认 32(按模型吞吐调优)。vector_upsert_batch_size默认 128。- 对同 URI 的多事件可做“批内压缩”:
UPSERT + UPSERT合并为最后一个。UPSERT + DELETE合并为DELETE。MOVE + UPSERT(old_uri)转换为UPSERT(new_uri)。
9.3 背压策略
- 当 embedder/vector 延迟升高时降低 batch。
- 当 outbox 堆积超阈值时提高 worker 副本数。
- 设
max_inflight_events防止内存失控。
10. 关键时序
10.1 UPSERT_CONTEXT
IndexWorker.run_once
-> OutboxStore.claim_batch
-> handle_event(UPSERT_CONTEXT)
-> ContextFS.read_node
-> IndexProjector.build_context_records
-> Embedder.embed_texts
-> VectorIndex.upsert
-> OutboxStore.mark_done
10.2 DELETE_CONTEXT
IndexWorker.run_once
-> claim_batch
-> handle_event(DELETE_CONTEXT)
-> ContextFS.exists (optional anti-stale check)
-> VectorIndex.delete_by_uri
-> mark_done
10.3 MOVE_CONTEXT
IndexWorker.run_once
-> claim_batch
-> handle_event(MOVE_CONTEXT)
-> IndexProjector.build_move_records
-> VectorIndex.move_uri(from_uri, to_uri)
(update uri + parent_uri + id rekey)
-> mark_done
10.4 UPSERT_RELATION
IndexWorker.run_once
-> claim_batch
-> handle_event(UPSERT_RELATION)
-> RelationStore.get_edges
-> IndexProjector.build_relation_records
-> relation index adapter / optional provider sync
-> mark_done
11. 接口定义(可直接映射代码)
与
10.6 Index API保持一致,并补齐参数/返回/错误。
11.1 OutboxStore
class OutboxStore(Protocol):
def append(self, event: OutboxEventCreate) -> OutboxEvent: ...
def append_batch(self, events: list[OutboxEventCreate]) -> list[OutboxEvent]: ...
def claim_batch(self, worker_id: str, limit: int) -> list[OutboxEvent]: ...
def mark_done(self, event_id: str) -> None: ...
def mark_failed(self, event_id: str, error: str, retryable: bool = True) -> None: ...
错误:
OutboxClaimConflictErrorOutboxEventNotFoundErrorOutboxStateTransitionErrorOutboxStorageError
实现说明:
- 若实现需要
lease_seconds / claim_token / claimed_by / next_visible_at,可作为outbox_store_sql.py等实现层的私有字段或私有方法。 - 对外暴露的核心契约保持与
ce_architecture.md的10.6 Index API一致。
11.2 IndexProjector
class IndexProjector(Protocol):
def build_context_records(self, node: ContextNode) -> list[IndexRecord]: ...
def build_relation_records(
self,
uri: str,
edges: list[RelationEdge],
) -> list[RelationIndexRecord]: ...
def build_move_records(self, from_uri: str, to_uri: str) -> MoveProjection: ...
错误:
InvalidNodeErrorInvalidUriErrorProjectionError
11.3 Embedder
class Embedder(Protocol):
def embed_texts(
self,
texts: list[str],
model: str | None = None,
timeout_seconds: int | None = None,
) -> list[EmbeddingVector]: ...
错误:
EmbedTimeoutErrorEmbedRateLimitErrorEmbedProviderError
11.4 VectorIndex
class VectorIndex(Protocol):
def upsert(self, records: list[IndexRecordWithVector]) -> None: ...
def delete_by_uri(self, uri: str) -> None: ...
def move_uri(self, from_uri: str, to_uri: str) -> None: ...
def search(
self,
query_vector: list[float],
filters: dict[str, Any],
top_k: int,
level: int = 0,
) -> list[VectorHit]: ...
错误:
VectorIndexUnavailableErrorVectorWriteConflictErrorVectorSchemaMismatchErrorVectorQueryError
实现说明:
upsert_relations(...)、递归删除、迁移统计等可作为实现层扩展能力,不纳入10.6的核心公共接口。MOVE_CONTEXT所需的递归 URI 迁移逻辑由VectorIndex.move_uri(from_uri, to_uri)的实现自行完成。
11.5 IndexWorker
class IndexWorker:
def __init__(
self,
worker_id: str,
outbox: OutboxStore,
context_fs: ContextFS,
relation_store: RelationStore,
projector: IndexProjector,
embedder: Embedder,
vector_index: VectorIndex,
config: IndexWorkerConfig,
logger: Logger,
) -> None: ...
def run_once(self, limit: int = 100) -> IndexBatchResult: ...
def handle_event(self, event: OutboxEvent, ctx: RequestContext) -> None: ...
错误(handle_event 抛出,由 run_once 分类):
RetryableIndexErrorNonRetryableIndexError
11.6 事件/回调约定
- 模块内事件源:Outbox 事件(非实时回调协议,采用轮询 claim)。
- 可选回调钩子(便于测试与观测):
on_event_start(event: OutboxEvent) -> Noneon_event_success(event: OutboxEvent, latency_ms: int) -> Noneon_event_failure(event: OutboxEvent, error: Exception, retryable: bool) -> None
- 约束:
- 回调异常不得影响主流程;必须吞掉并记日志。
- 回调应为“旁路”行为,不得写主存。
11.7 接口示例(便于 Agent 生成代码)
# 1) worker 拉取并处理一轮
worker = IndexWorker(
worker_id="index-worker-1",
outbox=outbox_store,
context_fs=context_fs,
relation_store=relation_store,
projector=index_projector,
embedder=embedder,
vector_index=vector_index,
config=IndexWorkerConfig(claim_limit=100),
logger=logger,
)
batch = worker.run_once(limit=100)
# 2) 事件写入(commit 侧)
outbox_store.append_batch([
OutboxEventCreate(
event_type="UPSERT_CONTEXT",
uri="ctx://acc1/users/u1/memories/preferences/theme",
payload={"event_version": 1, "uri": "ctx://acc1/users/u1/memories/preferences/theme"},
)
])
12. 配置项与可观测性
12.1 配置项建议
index:
worker:
enabled: true
poll_interval_ms: 500
claim_limit: 100
lease_seconds: 60
heartbeat_seconds: 20
max_inflight_events: 200
outbox:
max_retries: 8
retry_base_seconds: 2
retry_max_seconds: 300
retry_jitter_ratio: 0.2
dead_letter_enabled: true
embedding:
model: "text-embedding-3-large"
batch_size: 32
timeout_seconds: 30
vector:
upsert_batch_size: 128
relation_index_enabled: false
repair:
dead_event_scan_interval_minutes: 10
reconcile_interval_minutes: 60
12.2 指标(Metrics)
index_outbox_pending_totalindex_outbox_claimed_totalindex_outbox_dead_totalindex_worker_run_once_latency_msindex_worker_event_success_total{event_type}index_worker_event_failed_total{event_type,error_code}index_embed_latency_msindex_vector_upsert_latency_msindex_repair_requeued_total
12.3 日志与追踪
- 日志字段最小集:
trace_id,event_id,event_type,uri,worker_id,retry_count。 - 若实现层启用租约/claim token,可作为附加日志字段输出。
- 对每个事件输出
start/success/fail三类结构化日志。 - Trace span 建议:
index.claim_batchindex.handle_eventindex.embedindex.vector_writeindex.outbox_mark_done
12.4 参数范围与行为影响
| 参数 | 默认值 | 建议范围 | 影响 |
|---|---|---|---|
worker.claim_limit |
100 | 10~500 | 越大吞吐越高,但单轮延迟与内存占用上升 |
worker.lease_seconds |
60 | 15~300 | 过短易误回收,过长影响故障恢复速度 |
outbox.max_retries |
8 | 3~20 | 越大越不易丢事件,但死信收敛更慢 |
outbox.retry_base_seconds |
2 | 1~10 | 初始重试速度 |
outbox.retry_max_seconds |
300 | 30~1800 | 长尾失败的最大等待 |
embedding.batch_size |
32 | 8~128 | 影响 embedding 吞吐与超时风险 |
embedding.timeout_seconds |
30 | 5~120 | 过小易误超时,过大拖慢失败反馈 |
vector.upsert_batch_size |
128 | 16~512 | 影响写入吞吐与事务大小 |
worker.max_inflight_events |
200 | 50~2000 | 并发上限,受内存与下游容量限制 |
13. 测试策略
13.1 单元测试
stable_hash:同输入恒定、不同 level 不同 ID、URI 规范化一致。build_context_records:L0/L1/L2 构建与降级逻辑。OutboxStore状态迁移:pending->claimed->done、重试与 dead。move_uri:uri/parent_uri/id同步迁移与幂等。
13.2 集成测试
run_once成功链路:claim 到 done 全流程。- embed 失败重试:验证 backoff 与最终 done。
- vector 写成功但 mark_done 失败:重复 upsert 不重复脏数据。
- 并发 claim:多 worker 下事件不重复消费。
- 租约超时回收:worker 崩溃后事件可继续处理。
13.3 端到端测试
- commit 产生 outbox -> worker 索引 -> search 可召回。
- delete/move 后索引一致性验证。
- dead-letter + repair requeue 后恢复。
13.4 可靠性测试(建议)
- 故障注入:embedder 超时、vector 5xx、outbox DB 暂时不可用。
- 压测:高并发 append + 多 worker 消费。
- 恢复:进程重启/宕机后数据不丢失、可继续消费。
13.5 可直接生成单元测试的样例数据
13.5.1 stable_hash 用例
[
{
"name": "same_input_same_id",
"input": {"uri": "ctx://acc1/users/u1/memories/preferences/theme", "level": 0},
"expect": {"equal_to_case": "same_input_same_id_repeat"}
},
{
"name": "same_input_same_id_repeat",
"input": {"uri": "ctx://acc1/users/u1/memories/preferences/theme", "level": 0},
"expect": {"id_non_empty": true}
},
{
"name": "different_level_different_id",
"input": {"uri": "ctx://acc1/users/u1/memories/preferences/theme", "level": 1},
"expect": {"not_equal_to_case": "same_input_same_id"}
}
]
13.5.2 build_context_records 输入输出
{
"node": {
"uri": "ctx://acc1/users/u1/memories/preferences/theme",
"parent_uri": "ctx://acc1/users/u1/memories/preferences",
"context_type": "memory",
"category": "preferences",
"owner_space": "user:u1",
"account_id": "acc1",
"abstract": "用户偏好深色主题。",
"overview": "用户在多个会话中明确偏好深色主题,且不喜欢高亮背景。",
"content": "原始内容...",
"metadata": {"node_version": 12}
},
"expect": {
"record_count": 3,
"levels": [0, 1, 2],
"all_have_fields": ["id", "uri", "parent_uri", "context_type", "category", "level", "text", "filters", "metadata"]
}
}
13.5.3 边界/异常用例
abstract/overview/content全空:期望record_count=0,并产生可观测 warning。- 非法
level=9:期望抛出NonRetryableIndexError。 - 若实现层启用租约校验:过期 claim 不得成功执行
mark_done。 MOVE_CONTEXT from_uri==to_uri:期望直接成功且不抛异常。
14. 建议目录结构(可直接映射到代码文件/类)
src/
index/
__init__.py
types.py # IndexRecord/OutboxEvent/MoveProjection/IndexBatchResult
stable_hash.py # canonicalize_uri + stable_hash
record_builder.py # L0/L1/L2 text构建 + filters/metadata
index_projector.py # IndexProjectorImpl
outbox_store.py # OutboxStore协议
outbox_store_sql.py # SQL实现(含claim/lease/CAS)
index_worker.py # IndexWorker.run_once/handle_event
error_classifier.py # retryable/non-retryable分类
repair_job.py # dead-letter重放 + reconcile
vector_index_adapter.py # VectorIndex实现适配
relation_index_adapter.py # (可选)关系副本适配
metrics.py # 指标定义
建议类与关键方法:
StableHash
canonicalize_uri(uri: str) -> strstable_hash(uri: str, level: int, id_schema_version: str = "v1") -> str
RecordBuilder
build_level_texts(node: ContextNode) -> dict[int, str]build_filters(node: ContextNode, level: int) -> dict[str, Any]build_metadata(node: ContextNode, level: int, text: str) -> dict[str, Any]
IndexProjectorImpl
build_context_records(node: ContextNode) -> list[IndexRecord]build_relation_records(uri: str, edges: list[RelationEdge]) -> list[RelationIndexRecord]build_move_records(from_uri: str, to_uri: str) -> MoveProjection
SqlOutboxStore
claim_batch(worker_id: str, limit: int) -> list[OutboxEvent]mark_done(event_id: str) -> Nonemark_failed(event_id: str, error: str, retryable: bool = True) -> None_extend_lease(...)-> bool # 可选私有实现细节
IndexWorker
run_once(limit: int = 100) -> IndexBatchResulthandle_event(event: OutboxEvent, ctx: RequestContext) -> None_handle_upsert_context(...) -> None_handle_delete_context(...) -> None_handle_move_context(...) -> None_handle_upsert_relation(...) -> None
VectorIndexAdapter
upsert(records: list[IndexRecordWithVector]) -> Nonedelete_by_uri(uri: str) -> Nonemove_uri(from_uri: str, to_uri: str) -> Nonesearch(query_vector: list[float], filters: dict[str, Any], top_k: int, level: int = 0) -> list[VectorHit]
15. 默认实现决策(v1)
id_schema_version固定v1,hash 算法md5。MOVE_CONTEXT默认recursive=true。UPSERT_RELATION默认开启事件写入,关系向量索引默认关闭(可配置开启)。- worker 以同步接口建模,内部可用 async 并行 embedding/vector 写入。
16. 覆盖对照(对应评审清单)
| 评审项 | 覆盖位置 |
|---|---|
| 1. 模块概述(名称/职责/依赖/I/O/目标约束) | 1.3/1.4/1.5 |
| 2. 接口定义(参数/返回/异常/事件/示例) | 11.1~11.7 |
| 3. 数据结构(定义/字段/约束/示例) | 2.1~2.6 |
| 4. 逻辑与算法(流程/分支/循环/状态) | 5.1~5.5、10.1~10.4 |
| 5. 错误处理(类型/触发/策略) | 8.1、8.4 |
| 6. 配置参数(默认/影响/范围约束) | 12.1、12.4 |
| 7. 测试指导(测试点/样例/边界异常) | 13.1~13.5 |