oGMemory 异步索引同步设计
版本: v2.0
日期: 2026-03-20
状态: 设计稿
1. 概述
本文档描述 oGMemory 异步索引同步的设计方案,采用调度器定期轮询 + 软锁并发控制模式,复用现有的 OutboxWorker.run_once() 实现。
1.1 设计目标
- 异步解耦: 写入路径与索引更新解耦,不影响主流程性能
- 可靠处理: 事件持久化到文件系统,支持故障恢复
- 并发安全: 通过
.processing软锁实现多 Worker 并发抢占 - 简单可靠: 复用现有组件,最小化新增代码
1.2 现有组件复用
| 组件 | 文件 | 说明 |
|---|---|---|
OutboxStore |
commit/outbox_store.py |
事件持久化,已实现 |
OutboxWorker |
index/outbox_worker.py |
核心处理逻辑,已实现 |
run_once() |
index/outbox_worker.py |
单轮消费循环,已实现 |
2. 架构设计
2.1 整体架构
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ oGMemory 异步索引同步架构 │
└─────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ 写入路径 (已实现) │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ContextWriter.write_candidate() │
│ │ │
│ ▼ │
│ OutboxStore.register_write() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────┐ │
│ │ .outbox/{event_id}.json │ ◄─── 持久化到AGFS文件系统 │
│ │ { │ │
│ │ "event_id": "uuid", │ │
│ │ "event_type": "UPSERT", │ │
│ │ "status": "PENDING", │ │
│ │ "payload": {...} │ │
│ │ } │ │
│ └─────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
│
│ 定时触发
▼
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ 调度器 (需实现) │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ APScheduler │ │
│ │ │ │
│ │ 配置: │ │
│ │ - trigger: interval, seconds=30 │ │
│ │ - max_instances: 1 (防止并发执行) │ │
│ │ - misfire_grace_time: 10 │ │
│ │ │ │
│ │ 调用: │ │
│ │ worker.run_once( │ │
│ │ outbox_store=store, │ │
│ │ account_ids=get_active_account_ids(), │ │
│ │ worker_id="worker-1" │ │
│ │ ) │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ run_once() 内部流程 (已实现) │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. list_pending(account_id) │ │
│ │ └── 扫描 .outbox/ 目录,自动过滤有活跃 .processing 锁的事件 │ │
│ │ │ │
│ │ 2. try_acquire(event, node_uri, worker_id) │ │
│ │ └── 写入 .processing 软锁文件 │ │
│ │ └── 已被其他 Worker 占用则跳过 │ │
│ │ │ │
│ │ 3. process_event(event) │ │
│ │ └── 调 Embedder 生成向量 │ │
│ │ └── 调 VectorIndex.upsert 写入 │ │
│ │ │ │
│ │ 4. 结果路由: │ │
│ │ ├── 成功 → mark_done │ │
│ │ ├── 可重试 → increment_retry + release │ │
│ │ └── 超限 → move_to_dlq │ │
│ │ │ │
│ │ 5. 异常时自动 release 释放锁 │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │
│ 返回: {"processed": 12, "succeeded": 10, "failed": 1, │
│ "moved_to_dlq": 0, "skipped": 1} │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
2.2 多 Worker 并发模型
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ 多 Worker 并发处理 │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ 每次写入操作生成一个新的 event_id,一个节点可能有多个待处理事件 │
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ 节点 A: profile/.outbox/ 节点 B: settings/.outbox/ │ │
│ │ │ │
│ │ .outbox/ .outbox/ │ │
│ │ ├── a1b2c3d4.json ├── e5f6g7h8.json │ │
│ │ ├── a1b2c3d4.processing ◄─ W1 ├── e5f6g7h8.processing ◄─ W2 │ │
│ │ ├── i9j0k1l2.json └── m3n4o5p6.json │ │
│ │ └── i9j0k1l2.processing ◄─ W2 (无锁,可被抢占) │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Worker-1 │ │ Worker-2 │ │
│ │ (Thread-1) │ │ (Thread-2) │ │
│ │ │ │ │ │
│ │ 扫描所有 .outbox/ │ │ 扫描所有 .outbox/ │ │
│ │ │ │ │ │
│ │ try_acquire(a1b2) │ │ try_acquire(e5f6) │ │
│ │ → 成功,处理中 │ │ → 成功,处理中 │ │
│ │ │ │ │ │
│ │ try_acquire(i9j0) │ │ try_acquire(i9j0) │ │
│ │ → 失败,跳过 │ │ → 成功,处理中 │ │
│ │ (已被 W2 锁定) │ │ │ │
│ │ │ │ │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ 锁粒度: 每个 event_id 独立加锁,同一节点的多个事件可被不同 Worker 并行处理 │
│ 锁超时: .processing 文件超过 300 秒后自动失效,任何 Worker 都可以重新获取 │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
3. Payload 格式约定
每个事件文件的格式固定如下:
{
"event_id": "uuid-v4",
"event_type": "UPSERT_CONTEXT",
"uri": "ctx://{account}/users/{user}/memories/profile",
"payload": {
"records": [
{
"id": "sha256(uri:level)[:16]",
"uri": "ctx://...",
"level": 0,
"text": "一句话摘要文本",
"filters": {
"account_id": "acme-corp",
"owner_space": "user_space_alice"
},
"metadata": { "category": "profile", "context_type": "MEMORY" }
},
{ "level": 1, "text": "结构化概述..." },
{ "level": 2, "text": "完整正文..." }
]
},
"status": "PENDING",
"retry_count": 0,
"created_at": "2026-03-20T10:00:00+00:00"
}
每个节点最多产生 3 条 record(L0 摘要 / L1 概述 / L2 正文)。id 字段由 sha256(uri:level) 稳定生成,同一节点同一层级的 id 永远相同,VectorIndex 的 upsert 天然幂等。
4. AGFS 目录结构
ctx://{account}/users/{user}/memories/profile/
├── .abstract.md ← L0 文本来源
├── .overview.md ← L1 文本来源
├── content.md ← L2 文本来源
├── .meta.json ← status=ACTIVE 表示节点写入完整
├── .relations.json
└── .outbox/
├── a1b2c3d4.json ← PENDING 事件文件 (event_id = a1b2c3d4)
├── a1b2c3d4.processing ← 软锁文件(Worker 写入)
├── e5f6g7h8.json ← 另一个事件(同一节点多次写入)
└── dlq/
└── i9j0k1l2.json ← 超过最大重试次数后移入
说明:
- 每次写入操作生成一个新的
event_id(UUID),因此一个节点可能有多个待处理事件 - 锁文件命名:
{event_id}.processing,每个事件独立加锁 - 同一节点的多个事件可被不同 Worker 并行处理(因为 VectorIndex.upsert 是幂等的)
5. 已实现 API
以下全部已实现,直接导入使用即可:
from commit.outbox_store import OutboxStore
from index.outbox_worker import OutboxWorker
worker = OutboxWorker(vector_index=your_index, embedder=your_embedder)
store = OutboxStore(client=agfs_client)
# 单轮轮询——从调度器里调用这个
stats = worker.run_once(
outbox_store=store,
account_ids=["acme-corp", "other-tenant"],
worker_id="worker-1", # 每个线程用唯一的 worker_id
)
# 返回:{"processed": 12, "succeeded": 10, "failed": 1,
# "moved_to_dlq": 0, "skipped": 1}
6. 需交付内容
6.1 调度器
调度器负责定期触发 run_once(),是整个异步索引同步的入口。
6.1.1 核心配置
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED
scheduler = BackgroundScheduler()
scheduler.add_job(
func=lambda: worker.run_once(store, get_active_account_ids(), "worker-1"),
trigger="interval",
seconds=30, # 轮询间隔
max_instances=1, # 防止并发执行
misfire_grace_time=10, # 错过执行时间的容忍秒数
coalesce=True, # 错过的多次执行合并为一次
)
scheduler.start()
6.1.2 配置参数说明
| 参数 | 默认值 | 说明 |
|---|---|---|
trigger |
"interval" |
固定间隔触发 |
seconds |
30 |
轮询间隔,根据事件产生频率调整 |
max_instances |
1 |
同一任务最多同时运行实例数,防止重叠执行 |
misfire_grace_time |
10 |
错过执行时间后的容忍秒数,超过则跳过 |
coalesce |
True |
多次错过的执行合并为一次 |
6.1.3 调度器生命周期
class OutboxScheduler:
"""调度器封装,管理生命周期和错误处理"""
def __init__(
self,
worker: OutboxWorker,
outbox_store: OutboxStore,
get_account_ids: Callable[[], list[str]],
worker_id: str,
interval_seconds: int = 30,
):
self._worker = worker
self._store = outbox_store
self._get_account_ids = get_account_ids
self._worker_id = worker_id
self._interval = interval_seconds
self._scheduler = BackgroundScheduler()
self._running = False
def start(self) -> None:
"""启动调度器"""
if self._running:
return
self._scheduler.add_job(
func=self._run_once_wrapped,
trigger="interval",
seconds=self._interval,
max_instances=1,
misfire_grace_time=10,
coalesce=True,
id="outbox_worker",
name=f"outbox_worker_{self._worker_id}",
)
# 注册事件监听
self._scheduler.add_listener(
self._on_job_error,
EVENT_JOB_ERROR | EVENT_JOB_MISSED
)
self._scheduler.start()
self._running = True
logger.info(f"Scheduler started, worker_id={self._worker_id}")
def stop(self, wait: bool = True) -> None:
"""停止调度器
Args:
wait: 是否等待当前任务完成
"""
if not self._running:
return
self._scheduler.shutdown(wait=wait)
self._running = False
logger.info(f"Scheduler stopped, worker_id={self._worker_id}")
def _run_once_wrapped(self) -> dict:
"""包装 run_once,添加日志和错误处理"""
try:
account_ids = self._get_account_ids()
stats = self._worker.run_once(
outbox_store=self._store,
account_ids=account_ids,
worker_id=self._worker_id,
)
logger.info(f"run_once completed: {stats}")
return stats
except Exception as e:
logger.error(f"run_once failed: {e}", exc_info=True)
raise
def _on_job_error(self, event) -> None:
"""任务执行错误回调"""
if event.exception:
logger.error(
f"Job {event.job_id} failed: {event.exception}",
exc_info=event.exception
)
elif event.code == EVENT_JOB_MISSED:
logger.warning(f"Job {event.job_id} missed")
6.1.4 获取活跃账户列表
get_active_account_ids() 需要自行实现,推荐方式:
方式一:枚举 AGFS 目录
def get_active_account_ids() -> list[str]:
"""从 AGFS 枚举所有活跃账户"""
accounts_dir = f"{mount_prefix}/accounts"
try:
entries = agfs_client.list(accounts_dir)
return [e.name for e in entries if e.is_dir]
except Exception as e:
logger.error(f"Failed to list accounts: {e}")
return []
6.1.5 多 Worker 部署
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ 多 Worker 部署架构 │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ AGFS (共享存储) │ │
│ │ │ │
│ │ /accounts/acme-corp/users/alice/memories/profile/.outbox/ │ │
│ │ /accounts/acme-corp/users/bob/memories/settings/.outbox/ │ │
│ │ /accounts/other-tenant/users/charlie/memories/notes/.outbox/ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Thread-1 │ │ Thread-2 │ │ Thread-3 │ │
│ │ │ │ │ │ │ │
│ │ worker_id: "w1" │ │ worker_id: "w2" │ │ worker_id: "w3" │ │
│ │ interval: 30s │ │ interval: 30s │ │ interval: 30s │ │
│ │ │ │ │ │ │ │
│ │ APScheduler │ │ APScheduler │ │ APScheduler │ │
│ │ BackgroundScheduler│ │ BackgroundScheduler│ │ BackgroundScheduler│ │
│ │ │ │ │ │ │ │
│ └─────────────────────┘ └─────────────────────┘ └─────────────────────┘ │
│ │
│ 并发控制: │
│ - 每个线程独立运行调度器,通过 .processing 软锁实现并发控制 │
│ - worker_id 必须唯一,用于标识锁的持有者 │
│ - 锁超时 300 秒,防止 Worker 崩溃后事件无法被处理 │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
多线程 Worker 启动示例:
import threading
import uuid
def start_worker_thread(worker_id: str, interval_seconds: int = 30):
"""启动单个 Worker 线程"""
scheduler = OutboxScheduler(
worker=worker,
outbox_store=outbox_store,
get_account_ids=get_active_account_ids,
worker_id=worker_id,
interval_seconds=interval_seconds,
)
scheduler.start()
return scheduler
# 启动多个 Worker 线程
worker_count = 3
schedulers = []
for i in range(worker_count):
worker_id = f"worker-{i+1}"
t = threading.Thread(
target=start_worker_thread,
args=(worker_id, 30),
name=f"outbox-worker-{i+1}",
daemon=True,
)
t.start()
schedulers.append(t)
print(f"Started {worker_count} worker threads")
worker_id 生成方式:
import os
import uuid
def get_worker_id() -> str:
"""获取唯一的 worker_id"""
return f"worker-{uuid.uuid4().hex[:8]}"
6.1.6 优雅关闭
import signal
import sys
scheduler = OutboxScheduler(...)
def signal_handler(signum, frame):
"""处理 SIGTERM/SIGINT 信号,优雅关闭"""
logger.info(f"Received signal {signum}, shutting down...")
scheduler.stop(wait=True) # 等待当前任务完成
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
scheduler.start()
6.1.7 监控指标
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SchedulerMetrics:
"""调度器运行指标"""
worker_id: str
last_run_time: datetime | None
last_run_stats: dict | None
total_processed: int
total_succeeded: int
total_failed: int
total_dlq: int
consecutive_errors: int
is_running: bool
# 在 _run_once_wrapped 中更新指标
def _run_once_wrapped(self) -> dict:
self._metrics.last_run_time = datetime.now()
try:
stats = self._worker.run_once(...)
self._metrics.last_run_stats = stats
self._metrics.total_processed += stats["processed"]
self._metrics.total_succeeded += stats["succeeded"]
self._metrics.total_failed += stats["failed"]
self._metrics.total_dlq += stats["moved_to_dlq"]
self._metrics.consecutive_errors = 0
return stats
except Exception as e:
self._metrics.consecutive_errors += 1
raise
6.2 补充测试
在现有测试文件里补充以下用例:
tests/unit/commit/test_outbox_store.py
| 测试用例 | 验证内容 |
|---|---|
test_try_acquire_returns_false_when_locked |
已有有效锁时返回 False |
test_try_acquire_respects_timeout |
锁超过 300 秒后可以被抢占 |
test_release_removes_processing_file |
release 后 .processing 文件消失 |
test_list_pending_skips_locked_events |
list_pending 不返回有活跃锁的事件 |
tests/unit/index/test_outbox_worker.py
以下两个用例已提交,验证通过即可,不需要重写:
test_run_once_skips_acquired_eventstest_run_once_releases_lock_on_exception
7. 集成方式
7.1 使用 IndexService(推荐)
IndexService 封装了 OutboxScheduler 的生命周期管理,提供更简洁的 API:
from pyagfs import AGFSClient
from commit.outbox_store import OutboxStore
from providers.embedder import MockEmbedder
from providers.vector_index import InMemoryVectorIndex
from service import IndexService, setup_signal_handlers
# 初始化依赖
agfs_client = AGFSClient()
outbox_store = OutboxStore(agfs_client)
embedder = MockEmbedder()
vector_index = InMemoryVectorIndex()
# 获取活跃账户列表
def get_active_account_ids():
return ["acme-corp", "other-tenant"]
# 创建并启动 IndexService
service = IndexService(
outbox_store=outbox_store,
embedder=embedder,
vector_index=vector_index,
get_account_ids=get_active_account_ids,
interval_seconds=30,
worker_count=3,
)
# 设置信号处理(优雅关闭)
setup_signal_handlers(service)
# 启动服务
service.start()
print("IndexService started, press Ctrl+C to stop")
# 保持主线程运行
import signal
try:
while True:
signal.pause()
except KeyboardInterrupt:
service.stop()
7.2 使用单例模式
from service import init_index_service, get_index_service
# 初始化全局实例
init_index_service(
outbox_store=outbox_store,
embedder=embedder,
vector_index=vector_index,
get_account_ids=get_active_account_ids,
)
# 获取并启动
service = get_index_service()
service.start()
# 获取运行统计
stats = service.get_aggregated_stats()
# {"worker_count": 3, "total_processed": 100, "total_succeeded": 95, ...}
7.3 作为独立进程运行
# 使用 mock embedder(测试)
python -m examples.run_index_service --mock --workers 3 --interval 30
# 使用 OpenAI embedder(生产)
python -m examples.run_index_service --workers 3 --interval 30
8. 与 ContextEngine 集成
8.1 完整集成示例
from pyagfs import AGFSClient
from core.interfaces import ContextFS
from commit.outbox_store import OutboxStore
from providers.embedder import MockEmbedder
from providers.vector_index import InMemoryVectorIndex
from providers.llm import MockLLM
from service import (
MemoryWriteAPI,
init_write_api,
IndexService,
init_index_service,
)
# 1. 初始化 AGFS 和存储
agfs_client = AGFSClient()
context_fs: ContextFS = ... # 你的 ContextFS 实现
outbox_store = OutboxStore(agfs_client)
# 2. 初始化写入 API(会自动注册 outbox 事件)
llm = MockLLM()
write_api = init_write_api(context_fs, llm, outbox_store)
# 3. 初始化索引服务(会消费 outbox 事件)
embedder = MockEmbedder()
vector_index = InMemoryVectorIndex()
def get_active_account_ids():
# 从 AGFS 或配置获取活跃账户
return ["acme-corp"]
init_index_service(
outbox_store=outbox_store,
embedder=embedder,
vector_index=vector_index,
get_account_ids=get_active_account_ids,
)
# 4. 启动索引服务
get_index_service().start()
# 5. 写入操作会自动触发异步索引
from core.models import RequestContext
ctx = RequestContext(account_id="acme-corp", user_id="alice")
write_api.commit_session(
messages=[{"role": "user", "content": "I prefer dark mode"}],
ctx=ctx,
)
# 写入完成后,outbox 事件会被 IndexService 异步处理
8.2 数据流
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ 完整数据流 │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ 写入路径 │ │
│ │ │ │
│ │ commit_session() │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ContextWriter.write_candidate() │ │
│ │ │ │ │
│ │ ├──► 写入 AGFS 节点文件 │ │
│ │ │ │ │
│ │ └──► OutboxStore.register_write() │ │
│ │ │ │ │
│ │ └──► 创建 .outbox/{event_id}.json │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 异步 │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ 索引路径 │ │
│ │ │ │
│ │ IndexService (多线程) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ OutboxScheduler (每 30 秒) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ OutboxWorker.run_once() │ │
│ │ │ │ │
│ │ ├──► list_pending() 扫描待处理事件 │ │
│ │ │ │ │
│ │ ├──► try_acquire() 获取锁 │ │
│ │ │ │ │
│ │ ├──► Embedder.embed() 生成向量 │ │
│ │ │ │ │
│ │ ├──► VectorIndex.upsert() 写入索引 │ │
│ │ │ │ │
│ │ └──► mark_done() / move_to_dlq() │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘