45 nop-stream 分布式 Exactly-Once 运行时实现
Plan Status: completed Last Reviewed: 2026-05-24 Source:
ai-dev/design/nop-stream/architecture.md§5(分布式控制面契约)、§8(设计原则)、§9(设计不变量);审计确认当前代码为零分布式基础设施 Related: Plan 44 (本地 runtime 模型层集成), Plan 42 (设计模型层创建)
Purpose
实现 nop-stream 设计文档描述的分布式 exactly-once 流处理引擎。当前状态:整个 nop-stream 是单 JVM 进程内引擎,不存在任何分布式基础设施(无 RPC、无网络传输、无集群管理、无 fencing、无 leader 选举、无并行度 > 1)。本计划从零构建分布式运行时,完成后 15 条设计不变量全部可验证。
Current Baseline
单 JVM 运行时事实:
| 维度 | 当前状态 |
|---|---|
| 线程模型 | TaskExecutor 固定线程池,一个 Task 一个线程 |
| 数据交换 | ResultPartition = LinkedBlockingQueue<StreamElement>(capacity 1024),InputChannel 是同一 queue 的包装 |
| Barrier 注入 | scheduler-push:ScheduledExecutorService 直接调用每个 StreamTaskInvokable 的 tracker |
| 并行度 | 硬编码 taskIndex=0,每个 vertex 一个 task |
| Checkpoint | 单 JVM 内 CheckpointCoordinator + PendingCheckpoint + ICheckpointStorage |
| 状态存储 | MemoryKeyedStateBackend(HashMap),LocalFileCheckpointStorage(JSON 文件),JdbcCheckpointStorage |
| 网络传输 | 不存在。无 Netty、无 gRPC、无 socket、无 RPC |
| 集群管理 | 不存在。无 node、cluster、lease、fencing、heartbeat 概念 |
| 序列化 | 数据交换无序列化(直接对象引用);checkpoint 用 JsonTool |
Plan 44 完成后的增量(本地 runtime 模型集成):
- PartitionedPlan/DeploymentPlan 被生成和消费
- CheckpointParticipant 在 operator 快照阶段被调用
- ProcessingGuarantee 影响 InputGate barrier 对齐
- EdgeConfig 被 RecordWriter 读取
- 连接器声明一致性能力
- EpochManifest 持久化与恢复
- Fingerprint 兼容性校验
- StateShard 路由
完全缺失的分布式基础设施(本计划要构建):
| 组件 | 说明 |
|---|---|
| 消息传输层 | 替代 LinkedBlockingQueue 的跨进程数据交换 |
| 序列化层 | 数据记录、barrier、watermark 的跨进程序列化 |
| TaskManager | 独立 JVM 进程,承载 task attempt,汇报心跳 |
| JobCoordinator | 集群单点,持有 canonical plan,调度 task,触发 epoch,维护 fencing |
| ClusterRegistry | 一致视图:active coordinator、注册 nodes、node lease、task assignment |
| NodeLease / fencing | 失败检测和旧 attempt 隔离 |
| 并行度支持 | 按 PartitionedPlan 创建多个 subtask |
| barrier 注入迁移 | 从 scheduler-push 改为 source-pull |
Goals
实现设计文档 architecture.md §5 定义的分布式控制面契约和 §9 定义的 15 条设计不变量。具体目标:
- 多 TaskManager 执行:一个作业可部署到多个 TaskManager(嵌入式线程池模式或独立进程模式)上执行
- 跨 TaskManager 数据交换:RecordWriter 通过 IMessageService 发送到远程 InputGate
- 并行度 > 1:按 PartitionedPlan 创建 N 个 subtask,HASH/FORWARD/REBALANCE 分区正确路由
- source-pull barrier 注入:barrier 在 SourceContext.collect() 中由 source 读取线程注入(不变量 #4)
- fencing:所有跨 TaskManager 消息携带 fencing token,旧 attempt 的输出被拒绝(不变量 #8,四个场景全覆盖)
- epoch manifest durable 后 sink commit:coordinator 持久化 manifest 后才通知 commit,sink commit 幂等(不变量 #5)
- 全局恢复:失败后从最新 durable epoch 恢复所有 task(
checkpoint-design.md§8.1) - subsuming contract:commit 通知传递 epochId,sink 提交所有 epoch ≤ N 的 pending transaction(
checkpoint-design.md§2.7) - JobTerminationMode 四模式:CANCEL/DRAIN/SUSPEND/EXPORT_SAVEPOINT 完整实现(不变量 #15)
Non-Goals
- 不实现 Netty 网络栈(使用 Nop 平台已有的
IMessageService作为传输层) - 不实现二进制序列化(使用 JsonTool JSON 序列化 + 类型携带方案)
- 不实现 unaligned checkpoint
- 不实现 rescale / state redistribution(后续计划)
- 不实现局部恢复(初始版本只用全局恢复)
- 不实现 CREDIT_BASED / ACK_WINDOW 流控(只用 BLOCKING_QUEUE + 消息队列背压)
- 不实现 SlotSharingGroup / ResourceManager(Flink 概念,设计文档明确排除)
- 不实现 Coordinator HA:coordinator 是单点故障,崩溃后需手动重启。EpochManifest 已持久化到 JDBC,重启后可恢复。Coordinator failover 后的自动 leader election 和 state transfer 是后续计划
- 不实现 TaskManager 常驻 daemon(初始版本按作业生命周期管理)
关键架构决策(经三轮对抗性审查确认)
决策 1:传输层选择——IMessageService + 延迟 ACK
选择:使用 Nop 平台的 IMessageService 作为分布式数据交换传输层。
理由:
- Nop 已有
LocalMessageService(进程内)和PulsarMessageService(Apache Pulsar) - 消息队列天然提供持久化、分区、背压
- 符合设计文档 §7.1 "消息队列:IMessageService 桥接"
消息可靠性契约(解决 IMessageService 无 exactly-once 语义的问题):
| 机制 | 说明 |
|---|---|
发送端:RemoteResultPartition 发送 JSON 字符串(非 Object),确保即使是 LocalMessageService 也经过序列化/反序列化路径(或在测试模式下启用 verifySerialization 标志) |
解决 P1-2(LocalMessageService 绕过序列化) |
消费端延迟 ACK:RemoteInputChannel.onMessage() 将消息入 LinkedBlockingQueue 后不立即 ACK,而是返回 CompletionStage 在 task 线程消费完成后才 complete。如果 IMessageService 不支持延迟 ACK(如 LocalMessageService),则退化为即时 ACK(进程内不丢消息) |
解决 T4-1(消息可丢失) |
| barrier 消息可靠性:barrier 消息携带 fencing token + epochId,如果 barrier 丢失(checkpoint 超时),coordinator 重试触发新 epoch。幂等保证:重复 barrier 不导致重复快照 | 解决 barrier 丢失风险 |
传输模型:
- 每个 edge 的每个 (sourceSubtask, targetSubtask) 对应一个消息 topic
- topic 命名:
nop-stream.{jobId}.{edgeId}.{sourceSubtask}.{targetSubtask} - topic 数量估算:parallelism=4 时约 36 个 topic,parallelism=16 时约 528 个(设计文档目标 "中等规模 ETL",可接受)
决策 2:TaskManager 双模式运行
选择:TaskManager 支持两种运行模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| 嵌入式 | TaskManager 在 submitter JVM 内以线程池运行,task 之间通过 LocalMessageService 交换数据 |
测试、单机部署、开发 |
| 独立进程 | TaskManager 在独立 JVM 中运行,通过 PulsarMessageService 交换数据,由外部编排(脚本/K8s)启动 |
生产部署 |
嵌入式模式是初始版本的基线。独立进程模式通过配置切换(IMessageService 实现类),无需额外代码。
理由:
- 嵌入式模式下多 TaskManager = 多线程池,仍然是"多 task 并行执行"(满足 Goals #1 "一个作业可部署到多个 TaskManager 上执行"的语义,即使物理上在同一 JVM)
- 独立进程模式只需切换
IMessageService实现和启动方式 - 符合设计文档 §8.4 "可移植后端":本地 runtime 和分布式 runtime 使用同一语义
决策 3:序列化——类型携带方案
选择:StreamElementCodec 编码时在每条消息中携带 valueType 类名。StreamRecord 序列化时写入 "@valueType": "io.nop.example.Transaction" 字段,反序列化时根据此字段选择正确的 Java 类型。
具体方案:
PartitionedPlan中每条EdgePlan记录outputTypeClassName(从 StreamGraph 的 StreamEdge.outputType 获取)RemoteResultPartition编码时从 PartitionedPlan 读取类型信息,写入消息头RemoteInputChannel解码时从消息头读取类型信息,用JsonTool.parseBean(json, Class.forName(valueType))反序列化CheckpointBarrier/Watermark/WatermarkStatus无泛型,直接 JSON round-trip
理由:
- 解决
StreamRecord<T>泛型擦除问题(P1-1/R4-1) - 类型信息从 Pipeline 定义(PartitionedPlan)获取,不需要运行时反射
- 符合设计文档 "模型优先" 原则
决策 4:source-pull barrier 注入——SourceContext 内置检查点
选择:修改 SourceContext.collect() 实现,在每次 collect() 调用时检查 pending barrier 信号。
具体方案:
StreamSourceOperator内部维护一个BlockingQueue<CheckpointBarrier> pendingBarriers(容量 1)CheckpointBarrierTracker.triggerCheckpoint()不再直接调用sourceOp.injectBarrier(),而是将 barrier 放入pendingBarriers队列SourceContext.collect(T record)在 emit 记录之前检查pendingBarriers.poll():- 如果有 pending barrier → 先 emit barrier → 再 emit record
- 如果没有 → 直接 emit record
- 这样 barrier 注入发生在 source 读取线程的
collect()调用中(不变量 #4) - 不需要修改
SourceFunction接口——barrier 检查在SourceContext实现中
理由:
- 解决 P4-1(source-pull 需要 SourceFunction 内部检查点但 run() 是不透明循环)
- SourceContext 是 nop-stream 内部类,修改不影响用户代码
- barrier 在
collect()调用间隙注入,不中断 source function 的运行循环
决策 5:fencing token 传递机制
选择:fencing token 嵌入到每条 IMessageService 消息的消息头中(非消息体),RemoteInputChannel 在消费端验证。
具体方案:
- 所有跨进程消息使用统一的信封格式:
StreamMessageEnvelope { String fencingToken, long epochId, String type, Object payload } RemoteResultPartition.write()将 fencing token(从 TaskManager 上下文获取)写入信封RemoteInputChannel收到消息后检查 fencing token:- token 与本地最新 token 一致 → 接受
- token 比本地旧 → 拒绝(旧 attempt 的输出)
- token 比本地新 → 更新本地 token 并接受(新 attempt 或新 coordinator)
CheckpointBarrier消息中额外携带 coordinator fencing token,source task 只响应带有效 token 的 barrier(不变量 #4 "source task 只响应带有效 fencing token 的 epoch")
覆盖 fencing 四个场景(checkpoint-design.md §8.2):
| 场景 | 机制 |
|---|---|
| task restart | 新 attempt 获得新 fencing token → 旧 attempt 的 RemoteResultPartition 写入的消息被拒绝 |
| coordinator failover | 新 coordinator 获得新 token → 旧 coordinator 触发的 barrier 被拒绝 |
| sink commit | CheckpointParticipant.finishCommit() 携带 epochId → 幂等 commit(重复调用不产生副作用) |
| transport write | RemoteInputChannel 验证每条消息的 fencing token |
决策 6:ClusterRegistry 用 JDBC
选择:ClusterRegistry 用 JDBC 实现(利用已有的 IJdbcTemplate)。
DDL 定义:
CREATE TABLE nop_stream_coordinator (
job_id VARCHAR(128) NOT NULL,
coordinator_id VARCHAR(128) NOT NULL,
fencing_token VARCHAR(128) NOT NULL,
registered_at TIMESTAMP NOT NULL,
PRIMARY KEY (job_id)
);
CREATE TABLE nop_stream_node (
node_id VARCHAR(128) NOT NULL,
endpoint VARCHAR(256),
capacity INT,
last_heartbeat TIMESTAMP,
PRIMARY KEY (node_id)
);
CREATE TABLE nop_stream_task_assignment (
job_id VARCHAR(128) NOT NULL,
vertex_id VARCHAR(128) NOT NULL,
subtask_index INT NOT NULL,
node_id VARCHAR(128) NOT NULL,
attempt_id VARCHAR(128) NOT NULL,
fencing_token VARCHAR(128) NOT NULL,
assigned_at TIMESTAMP,
PRIMARY KEY (job_id, vertex_id, subtask_index)
);
决策 7:并行度支持——GraphExecutionPlan.build() 重构
选择:GraphExecutionPlan.build() 按 PartitionedPlan 的 parallelism 创建 N × M 个 partition(每对 sourceSubtask-targetSubtask 一个)。
具体方案:
- 当前
build()为每条 JobEdge 创建一个ResultPartition - 重构为:为每条 JobEdge 的每个 (sourceSubtask, targetSubtask) 对创建一个
ResultPartition - 每个 sourceSubtask 的
RecordWriter持有 targetParallelism 个 partition RecordWriter.selectChannel()按 partition policy 选择目标 partition- CheckpointPlan 包含 parallelism × vertexCount 个 TaskLocation
决策 8:JobTerminationMode 四模式完整实现
选择:在 Phase 7(JobCoordinator)中完整实现 CANCEL/DRAIN/SUSPEND/EXPORT_SAVEPOINT 四种终止模式。
| 模式 | 实现方式 |
|---|---|
| CANCEL | coordinator 通过控制 topic 发送 CANCEL 信号,所有 TaskManager 停止 task |
| DRAIN | coordinator 发送 DRAIN 信号,source task truncate(DrainableSource)后触发 terminal checkpoint |
| SUSPEND | coordinator 触发 savepoint,durable 后发送 SUSPEND 信号,所有 task 暂停 |
| EXPORT_SAVEPOINT | coordinator 触发 savepoint 写入 protected namespace,作业继续运行 |
决策 9:Coordinator HA 降级为 Non-Goal
选择:coordinator 是单点故障。本计划不实现 coordinator HA(不实现 coordinator failover 后自动恢复)。coordinator 崩溃后需要手动重启,从最新 durable EpochManifest 恢复。
理由:
- coordinator HA 需要分布式共识(leader election + state transfer),这是一个独立的架构层
- 设计文档 §8.3 "coordinator 是逻辑单点,但不能成为 exactly-once 的单点故障"——但 "不能" 指的是不能丢失状态(通过 durable epoch log 保证),而非不能有单点
- 当前实现:coordinator 崩溃后状态不丢失(EpochManifest 已持久化到 JDBC),手动重启后可恢复
- 正式列入 Non-Goals 并说明影响
Execution Plan
Phase 0 - 信封与序列化基础设施
Status: completed
Targets: nop-stream-core (StreamElement, StreamRecord, CheckpointBarrier, Watermark, WatermarkStatus), 新增 StreamMessageEnvelope, StreamElementCodec
- Item Types:
Decision,Proof
目标:建立跨 TaskManager 消息传递的基础——统一信封格式(含 fencing token)、类型携带序列化、确保所有 StreamElement 子类可 JSON round-trip。
工作项:
- 新增
StreamMessageEnvelope数据类:字段包括fencingToken(String)、epochId(long)、type(String:STREAM_RECORD / CHECKPOINT_BARRIER / WATERMARK / WATERMARK_STATUS / CONTROL)、valueType(String,仅 STREAM_RECORD 使用)、payload(Object,实际数据) - 新增
StreamElementCodec:encode(StreamElement, String valueType, String fencingToken)→StreamMessageEnvelope;decode(StreamMessageEnvelope)→StreamElement。编码时:StreamRecord 写入 valueType + payload JSON;CheckpointBarrier/Watermark/WatermarkStatus 不需要 valueType - 验证并修复所有
StreamElement子类的 JSON round-trip(添加@DataBean/@JsonCreator注解) - 新增
TypeRegistry:存储 edge → outputTypeClassName 的映射(从 PartitionedPlan 填充),StreamElementCodec编码时从中读取类型信息 - 测试:每种 StreamElement 子类的 encode → JSON 字符串 → decode round-trip
- 测试:StreamMessageEnvelope 携带 fencing token 并在消费端验证
Exit Criteria:
- StreamMessageEnvelope 包含 fencingToken、type、valueType、payload
- StreamElementCodec 对 StreamRecord 正确携带 valueType(解决泛型擦除问题)
- 所有 StreamElement 子类可 JSON round-trip
- 端到端验证:StreamRecord → encode → JSON → decode → 内容与原始一致(包括 T 的具体类型恢复)
- 接线验证:fencing token 在信封中携带,消费端可读取
- 无静默跳过:未知 type 值抛出异常;valueType 对应的类不存在时抛出异常
- No owner-doc update required
-
ai-dev/logs/对应日期条目已更新
Phase 1 - source-pull barrier 注入
Status: completed
Targets: nop-stream-core (StreamSourceOperator, SourceContext, CheckpointBarrierTracker), nop-stream-runtime (GraphModelCheckpointExecutor)
- Item Types:
Decision,Fix
目标:实现不变量 #4 "barrier 只能由 source 读取线程注入"。通过修改 SourceContext.collect() 在数据循环中注入 barrier,不修改 SourceFunction 接口。
工作项:
- 修改
StreamSourceOperator:新增BlockingQueue<CheckpointBarrier> pendingBarriers(容量 1)字段 - 修改
SourceContext.collect(T record)实现:在 emit 记录之前poll()pendingBarriers,如果有则先调用output.emitBarrier()再 emit record - 修改
CheckpointBarrierTracker.triggerCheckpoint():不再直接调用sourceOp.injectBarrier(),改为将 barrier 放入pendingBarriers队列(非阻塞offer(),如果队列已有 barrier 则拒绝重复触发) - 修改
GraphModelCheckpointExecutor:不再通过triggerBarrierOnAllInvokables()直接调用每个 invokable 的 tracker,改为通知 coordinator 触发(通过信号或直接调用tracker.triggerCheckpoint()——后者仍然有效,只是 barrier 注入时机不同) - 测试:barrier 在 source 读取线程的
collect()调用中被注入(验证线程名) - 测试:source 在 barrier 注入后继续处理 post-barrier 数据
Exit Criteria:
-
SourceContext.collect()在 emit 记录前检查并注入 pending barrier -
CheckpointBarrierTracker.triggerCheckpoint()不再直接调用sourceOp.injectBarrier() - barrier 注入发生在 source 读取线程,不在 scheduler 线程
- 端到端验证:checkpoint 周期完整,barrier 在数据流中正确传播
- 接线验证:通过线程名断言验证 barrier 注入发生在 source 读取线程
- 无静默跳过:pendingBarriers 队列满时拒绝重复触发(不覆盖已有 barrier)
- 所有现有 checkpoint 测试通过
- 相关
ai-dev/design/nop-stream/checkpoint-design.md§2.3 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 2 - 并行度支持:多 subtask 创建、分区路由与 CheckpointCoordinator 改造
Status: completed
Targets: nop-stream-runtime (GraphExecutionPlan, CheckpointPlanBuilder, CheckpointCoordinator, PendingCheckpoint), nop-stream-core (RecordWriter, InputGate)
- Item Types:
Decision,Proof
目标:按 PartitionedPlan 的 parallelism 创建多个 subtask,实现 HASH/FORWARD/REBALANCE 分区路由,改造 CheckpointCoordinator 支持多 subtask per vertex。使用本地 ResultPartition(LinkedBlockingQueue)验证并行度正确性,不涉及 IMessageService 传输。
工作项:
- 重构
GraphExecutionPlan.build():- 按 PartitionedPlan 中每个 vertex 的 parallelism 创建 N 个 Task(不同 taskIndex)
- 为每条 JobEdge 的每个 (sourceSubtask, targetSubtask) 对创建一个本地
ResultPartition(LinkedBlockingQueue) - 每个 sourceSubtask 的 RecordWriter 持有 targetParallelism 个 partition
- 修改
RecordWriter:selectChannel()按 PartitionPolicy 选择:HASH →StateShard.stableHash(key) % partitions.length;FORWARD → sourceSubtask == targetSubtask 的 partition;REBALANCE → round-robin - 在
GraphExecutionPlan.build()中从 PartitionedPlan 的 EdgePlan.outputTypeClassName 填充 TypeRegistry(edge → outputTypeClassName 映射) - 修改
CheckpointPlanBuilder:为每个 subtask(vertex + taskIndex)生成 TaskLocation,ACK 集合包含所有 subtask - 修改
CheckpointCoordinator:registerTask()和acknowledgeTask()支持多个 subtask per vertex - 修改
PendingCheckpoint:tasksToAcknowledge包含 parallelism × vertexCount 个 TaskLocation - EdgeConfig 集成:
ResultPartition从 DeploymentPlan.edgeConfigs 读取 EdgeConfig,BLOCKING_QUEUE = queue 容量限制 - 测试:parallelism=2 的 source → keyBy → sink 端到端跑通(使用本地 queue)
- 测试:HASH 分区正确路由(同一 key 到同一 subtask)
- 测试:FORWARD / REBALANCE 分区正确路由
- 测试:parallelism=2 的 checkpoint 周期完整(所有 subtask ACK)
Exit Criteria:
- GraphExecutionPlan 按 PartitionedPlan.parallelism 创建 N 个 Task 和 N×M 个 ResultPartition
- HASH 分区正确路由(同一 key 始终到同一 subtask)
- FORWARD / REBALANCE 分区正确路由
- TypeRegistry 在 build() 中从 PartitionedPlan 填充
- CheckpointPlan 包含所有 subtask 的 TaskLocation
- CheckpointCoordinator 支持 multiple subtask per vertex
- PendingCheckpoint 的 tasksToAcknowledge 包含 parallelism × vertexCount 个 TaskLocation
- EdgeConfig 在 ResultPartition 创建时被读取
- 端到端验证:parallelism=2 的 source → keyBy → sink 端到端跑通(本地 queue)
- 端到端验证:parallelism=2 的 checkpoint 周期完整
- 接线验证:RecordWriter.selectChannel() 根据分区策略选择不同 subtask 的 partition
- 接线验证:CheckpointCoordinator.acknowledgeTask() 收齐所有 subtask 的 ACK
- 无静默跳过:未知 PartitionPolicy 抛出异常
- 现有 parallelism=1 端到端测试不受影响
- 相关
ai-dev/design/nop-stream/graph-model-design.md已更新 -
ai-dev/logs/对应日期条目已更新
Phase 3 - 消息传输层:RemoteResultPartition / RemoteInputChannel
Status: completed
Targets: nop-stream-core (ResultPartition, InputChannel), 新增 nop-stream-runtime 传输类
- Item Types:
Decision,Proof
目标:用 IMessageService 传输替换本地 queue,实现跨 TaskManager 数据交换。Phase 2 已验证并行度正确性,本 Phase 只替换传输层。
工作项:
- 新增
RemoteResultPartition(持有IMessageService+ topic +StreamElementCodec):write()将 StreamElement 编码为StreamMessageEnvelope(含 fencing token),通过IMessageService.send(topic, envelopeJson)发送 - 新增
RemoteInputChannel:订阅 IMessageService topic,onMessage()将消息放入LinkedBlockingQueue,延迟 ACK(返回CompletionStage,在read()消费完成后 ACK)。消费端验证 fencing token - 定义 topic 命名规则:
nop-stream.{jobId}.{edgeId}.{sourceSubtask}.{targetSubtask} - 修改
GraphExecutionPlan.build():根据 TaskManager 分布选择创建ResultPartition(同 TaskManager)或RemoteResultPartition(跨 TaskManager) - 修改
RecordWriter:构造函数支持RemoteResultPartition[](向后兼容ResultPartition[]) - 修改
InputGate:构造函数支持RemoteInputChannel[] - barrier 和 watermark 广播:RecordWriter.emitBarrier() / emitWatermark() 发送到所有下游 subtask 的 topic
- EdgeConfig 集成:
RemoteResultPartition从 DeploymentPlan.edgeConfigs 读取 EdgeConfig,BLOCKING_QUEUE = 消息队列容量限制 - 测试:单进程内通过 LocalMessageService 的 RemoteResultPartition/RemoteInputChannel 数据交换
- 测试:barrier 通过 RemoteResultPartition 广播到所有 RemoteInputChannel
- 测试:延迟 ACK 机制正确(消息在 task 线程消费后才 ACK)
Exit Criteria:
- RemoteResultPartition 通过 IMessageService 发送 StreamMessageEnvelope
- RemoteInputChannel 通过 IMessageService 接收并验证 fencing token
- 延迟 ACK 机制正确:消息在 task 线程消费后才 ACK
- GraphExecutionPlan.build() 根据部署模式选择本地/远程 partition
- barrier 通过 RemoteResultPartition 广播到所有 RemoteInputChannel
- EdgeConfig 在 RemoteResultPartition 创建时被读取
- 端到端验证:通过 LocalMessageService 的 RemoteResultPartition/RemoteInputChannel,parallelism=2 端到端跑通
- 端到端验证:parallelism=2 的 checkpoint 周期完整(barrier 通过 IMessageService 传播)
- 接线验证:RecordWriter.emitBarrier() 通过 RemoteResultPartition 发送,RemoteInputChannel 收到后 InputGate 正确对齐
- 无静默跳过:发送失败时抛出异常;fencing token 不匹配拒绝消息
- Phase 2 的本地 queue 测试不受影响
- 相关
ai-dev/design/nop-stream/architecture.md§6 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 4 - ClusterRegistry:集群状态持久化
Status: completed
Targets: nop-stream-runtime 新增 cluster 包
- Item Types:
Decision,Proof
目标:实现 ClusterRegistry——记录 active coordinator、注册 nodes、node lease、task assignment 的一致视图。
工作项:
- 新增
ClusterRegistry接口:registerCoordinator、getActiveCoordinator、registerNode、renewLease、getNodeLease、getActiveNodes、assignTask、getTaskAssignment、removeTaskAssignment - 新增数据模型类:
CoordinatorInfo、NodeInfo、LeaseInfo、TaskAssignment - 新增
JdbcClusterRegistry:基于IJdbcTemplate,使用决策 6 定义的 DDL(3 张表) - 新增 DDL 自动建表(
JdbcClusterRegistry初始化时检测表是否存在,不存在则创建) - Lease 过期检测:
getNodeLease()检查lastHeartbeat + leaseTimeout < now - 测试:registerNode → renewLease → getNodeLease → 过期检测
- 测试:assignTask → getTaskAssignment → removeTaskAssignment
- 测试:coordinator 注册、查询、fencing token 更新
Exit Criteria:
- ClusterRegistry 接口定义完整
- JdbcClusterRegistry 基于 IJdbcTemplate 实现,自动建表
- Lease 过期检测正确
- 端到端验证:registerCoordinator → registerNode → assignTask → getTaskAssignment 完整链路
- 接线验证:JdbcClusterRegistry 使用 IJdbcTemplate 执行 SQL
- 无静默跳过:lease 过期时 getNodeLease 返回过期状态
-
ai-dev/logs/对应日期条目已更新
Phase 5 - TaskManager:task 执行服务
Status: completed
Targets: nop-stream-runtime 新增 taskmanager 包
- Item Types:
Decision,Proof
目标:TaskManager 是 task 执行服务。嵌入式模式下在 submitter JVM 内运行(多线程池),独立进程模式下在独立 JVM 中运行。
工作项:
- 新增
TaskManager类:持有 nodeId、IMessageService、ClusterRegistry 引用。嵌入式模式下为 submitter JVM 内的线程池 - 实现 task 接收:监听 task assignment 控制消息(
nop-stream.{jobId}.assignment.{nodeId}),收到后创建StreamTaskInvokable(通过 RemoteResultPartition/RemoteInputChannel 交换数据) - 实现心跳:定期调用
ClusterRegistry.renewLease(nodeId, leaseTimeout) - 实现 task 生命周期:start task → run → complete/failed → report to coordinator
- 实现 checkpoint ACK:task 完成 checkpoint 快照后,通过控制 topic 发送 ACK 到 coordinator(携带 fencing token)
- 实现 barrier 信号接收:source task 监听控制 topic 的 barrier 触发信号,放入 pendingBarriers 队列
- 实现 fencing token 管理:TaskManager 维护当前 fencing token,创建新 task 时从 assignment 中获取,旧 token 的操作被拒绝
- 测试:TaskManager 接收 assignment → 创建 task → 运行 → 完成
- 测试:fencing token 验证——旧 token 的 ACK 被拒绝
Exit Criteria:
- TaskManager 可接收 task assignment 并创建 task
- TaskManager 通过 IMessageService 交换数据(RemoteResultPartition/RemoteInputChannel)
- 心跳续约正常
- Fencing token 在 task assignment 和 ACK 中验证
- 端到端验证:coordinator 分配 task → TaskManager 接收 → 数据处理完成 → 结果正确
- 接线验证:TaskManager 调用 ClusterRegistry.assignTask() 和 renewLease()
- 无静默跳过:task 执行失败时报告到 coordinator
- 相关
ai-dev/design/nop-stream/architecture.md§5 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 6 - JobCoordinator:分布式调度、fencing 与全局恢复
Status: completed
Targets: nop-stream-runtime 新增 coordinator 包,修改 GraphModelCheckpointExecutor
- Item Types:
Decision,Proof
目标:JobCoordinator 是集群单点,负责生成 DeploymentPlan、分配 task 到 TaskManager、触发 checkpoint epoch、维护 fencing token、处理全局恢复、实现四种 JobTerminationMode。
工作项:
- 新增
JobCoordinator:持有 canonical PartitionedPlan/DeploymentPlan,生成 fencing token(UUID),注册到 ClusterRegistry。内部持有CheckpointCoordinator实例用于 ACK 收集和 manifest 生成(分布式模式下 CheckpointCoordinator 的 trigger/ack 职责委托给 JobCoordinator,但 ACK 收集和 manifest 生成逻辑复用 CheckpointCoordinator) - 实现 task assignment:按 DeploymentPlan 将每个 subtask 分配到 TaskManager,通过控制 topic 发送 assignment(含 DeploymentPlan 片段、fencing token、operator chain 配置)
- 实现 fencing:所有 assignment 和控制消息携带 fencing token。TaskManager 验证后执行。旧 attempt 的 RemoteResultPartition 消息被 RemoteInputChannel 拒绝
- 实现 checkpoint 触发:coordinator 生成 epoch,通过控制 topic 发送 barrier 触发信号到所有 source task(携带 fencing token)
- 实现 ACK 收集:监听 checkpoint ACK 控制消息(携带 fencing token + TaskStateSnapshot),验证 token 后收集
- 实现 epoch manifest 持久化:所有 ACK 收齐后生成 EpochManifest(含 planFingerprint + fencingToken)并持久化
- 实现 commit 通知(subsuming contract):manifest durable 后,通过控制 topic 发送
notifyCheckpointComplete(epochId)。Sink 提交所有 epoch ≤ epochId 的 pending transaction - 实现 TaskManager 故障检测:定期检查 ClusterRegistry 的 node lease,过期则触发全局恢复
- 实现全局恢复:fence 所有旧 task(生成新 fencing token)→ 从最新 durable EpochManifest 读取状态 → 重新分配 task → 恢复状态
- 实现 JobTerminationMode 四模式:
- CANCEL:发送 CANCEL 信号到所有 TaskManager,立即停止
- DRAIN:发送 DRAIN 信号,source task 调用 DrainableSource.truncateForDrain(),触发 TERMINAL_SAVEPOINT
- SUSPEND:触发 savepoint,durable 后发送 SUSPEND 信号
- EXPORT_SAVEPOINT:触发 savepoint 到 protected namespace,作业继续
- 修改
GraphModelCheckpointExecutor:分布式模式下委托给 JobCoordinator - 测试:coordinator → 分配 task → barrier 触发 → ACK → manifest 持久化 → commit 通知
- 测试:全局恢复(模拟 TaskManager 故障)
- 测试:四种 JobTerminationMode 各自的行为
- 测试:subsuming contract(连续两个 checkpoint,第二个 commit 时两个 pending transaction 都被提交)
Exit Criteria:
- JobCoordinator 持有 canonical plan,生成 fencing token
- Task assignment 通过控制 topic 下发(携带 fencing token)
- Fencing 四个场景全覆盖:task restart、coordinator token、sink commit 幂等、transport write 验证
- Barrier 触发通过控制 topic 发送到 source task
- ACK 收集通过控制 topic 完成(验证 fencing token)
- Epoch manifest 在所有 ACK 收齐后持久化
- Commit 通知实现 subsuming contract(传递 epochId,sink 提交所有 ≤ epochId 的 pending transaction)
- 全局恢复正确(新 fencing token → fence 旧 task → 从 durable manifest 恢复)
- Checkpoint 超时后 coordinator 正确触发新 epoch(旧 epoch 被标记为 ABORTED)
- JobCoordinator 与 CheckpointCoordinator 关系明确(JobCoordinator 持有 CheckpointCoordinator 实例,委托 ACK 收集和 manifest 生成)
- JobTerminationMode 四模式各自行为正确
- 端到端验证:两 TaskManager 的 source → keyBy → sink,checkpoint 周期完整
- 端到端验证:一 TaskManager 故障后全局恢复,数据无丢失无重复
- 接线验证:fencing token 在 assignment、barrier 触发、ACK、commit 中全程传递和验证
- 无静默跳过:fencing token 不匹配时拒绝操作
- 不变量验证:#4、#5、#8
- 相关
ai-dev/design/nop-stream/architecture.md§5、ai-dev/design/nop-stream/checkpoint-design.md§8 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 7 - 连接器分布式协议升级
Status: completed
Targets: nop-stream-connector, nop-stream-core (SourceEnumeratorState)
- Item Types:
Decision,Proof
目标:connector 支持分布式 source split 分配、offset checkpoint、drain 截断。
工作项:
- 新增
SourceEnumerator(运行在 coordinator 端):管理 source split 发现、分配到 subtask、重分配。状态SourceEnumeratorState进入 EpochManifest - 修改
MessageSourceFunction:支持多分区并行消费(每个 subtask 订阅不同 topic/partition) - 修改
DebeziumCdcSourceFunction:实现DrainableSource.truncateForDrain() - 实现 source split 的 checkpoint:每个 subtask 的 split cursor 进入 EpochManifest
- 实现 SourceEnumerator 恢复:从 EpochManifest 恢复 split registry 和 assignment
- 测试:parallelism=2 source,split 正确分配和恢复
- 测试:DRAIN 模式下 source truncate 后 terminal checkpoint
Exit Criteria:
- SourceEnumerator 管理 split 分配和恢复
- 多 subtask source 各自消费不同 split
- DRAIN 模式触发 source truncate
- SourceEnumeratorState 进入 EpochManifest
- 端到端验证:parallelism=2 source → keyBy → sink 端到端跑通
- 端到端验证:DRAIN 模式下 terminal checkpoint 完成后作业结束
- 接线验证:恢复时从 EpochManifest 恢复 SourceEnumeratorState
- 无静默跳过:split 分配冲突时抛出异常
- 相关
ai-dev/design/nop-stream/connector-design.md§4 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 8 - 全链路分布式 exactly-once 验证
Status: completed Targets: 全模块
- Item Types:
Proof
目标:验证 15 条设计不变量全部成立。
工作项:
- 编写分布式端到端测试:2 TaskManager(嵌入式),source(REPLAYABLE) → keyBy(parallelism=2) → two-phase-commit-sink(TWO_PHASE_COMMIT),ProcessingGuarantee=STRICT_EXACTLY_ONCE
- 编写分布式 timer state 测试:source → keyBy → window(并行度=2) → sink,验证 timer state 在 checkpoint 快照中正确保存和恢复
- 逐一验证 15 条不变量(见 Exit Criteria)
- 编写分布式恢复测试:一 TaskManager 故障 → 全局恢复 → 继续处理 → 数据无丢失无重复
- 编写分布式 DRAIN 测试:DRAIN → terminal checkpoint → 作业结束 → 状态恢复 → 继续
- 编写分布式 fencing 测试:旧 attempt 的输出被拒绝、旧 coordinator 的 barrier 被拒绝
- 编写 subsuming contract 测试:连续 checkpoint,第二个 commit 时两个 pending transaction 都被提交
- 完整回归测试:
./mvnw test -pl nop-stream -am
Exit Criteria:
- 不变量 #1:状态恢复按 operatorId 路由(验证恢复后 operator state 正确)
- 不变量 #2:StateShard 路由确定性(同一 key 在不同 JVM 实例路由到同一 subtask)
- 不变量 #3:PartitionedPlan 是并行度、分区、状态路由的唯一来源
- 不变量 #4:barrier 由 source 读取线程注入(线程名断言)
- 不变量 #5:manifest durable 后 sink commit(验证 commit 时序)
- 不变量 #6:恢复从最新 durable epoch 开始(验证恢复时使用的 epochId)
- 不变量 #7:STRICT_EXACTLY_ONCE 校验 source/sink 能力
- 不变量 #8:旧 attempt 被 fencing(fencing 测试通过)
- 不变量 #9:timer state 进入 checkpoint
- 不变量 #11:StreamModel 包含 StreamComponents
- 不变量 #12:StreamRequirement 在编译时和运行时校验
- 不变量 #13:transactional operator 实现 CheckpointParticipant
- 不变量 #14:分布式 edge 配置 EdgeConfig
- 不变量 #15:JobTerminationMode 四模式各自正确
- 端到端验证:分布式 source → keyBy → two-phase-commit-sink 完整路径跑通
- 接线验证:Anti-Hollow 清单:
- JobCoordinator 注册到 ClusterRegistry
- TaskManager 注册到 ClusterRegistry 并续约 lease
- Task assignment 通过控制 topic 下发(携带 fencing token)
- Barrier 触发通过控制 topic 发送到 source task
- Checkpoint ACK 通过控制 topic 返回 coordinator
- EpochManifest 包含 fencing token + planFingerprint
- Fencing token 在 assignment/barrier/ACK/commit 中全程验证
- Lease 过期触发全局恢复
- SourceEnumeratorState 进入 EpochManifest
- Subsuming contract 正确(commit ≤ epochId 的 pending transaction)
- 无静默跳过:无空方法体、无吞异常、无 no-op
-
./mvnw test -pl nop-stream -am全部通过 -
ai-dev/logs/对应日期条目已更新
Phase Status
| Phase | 名称 | Depends on | Status |
|---|---|---|---|
| 0 | 信封与序列化基础设施 | Plan 44 | completed |
| 1 | source-pull barrier 注入 | Plan 44 | completed |
| 2 | 并行度支持(本地 queue) | Phase 0, Phase 1 | completed |
| 3 | 消息传输层(RemoteResultPartition/RemoteInputChannel) | Phase 2 | completed |
| 4 | ClusterRegistry | 无 | completed |
| 5 | TaskManager | Phase 3, Phase 4 | completed |
| 6 | JobCoordinator + fencing + 全局恢复 + JobTerminationMode | Phase 3, Phase 4, Phase 5 | completed |
| 7 | 连接器分布式协议 | Phase 6 | completed |
| 8 | 全链路分布式验证 | Phase 0-7 | completed |
并行可能性:Phase 0/1/4 互相独立可并行。Phase 2 依赖 Phase 0+1。Phase 3 依赖 Phase 2。Phase 5 依赖 Phase 3+4。Phase 6 依赖 Phase 5。Phase 7 依赖 Phase 6。Phase 8 依赖全部。
Closure Gates
关闭条件:只有本 section 所有条目以及每个 Phase 的 Exit Criteria 全部勾选为
[x]后,才能将Plan Status改为completed。
- 所有 9 个 Phase(Phase 0-8)的 Exit Criteria 全部满足
- 15 条设计不变量(architecture.md §9)全部有端到端测试验证
- 分布式端到端测试通过(2+ 节点 TaskManager)
- 分布式恢复测试通过(一节点故障 → 恢复 → 继续处理)
- 不存在被静默降级到 deferred / follow-up 的 in-scope live defect
- 受影响的 owner docs 已同步:
architecture.md、checkpoint-design.md、graph-model-design.md、connector-design.md、state-management-design.md、core-design.md、component-roadmap.md、../design/nop-stream/time-model-design.md、window-design.md - 独立子 agent closure-audit 已完成并记录证据
- Anti-Hollow Check:closure audit 验证分布式控制面所有组件在运行时被调用
-
./mvnw compile -pl nop-stream -am -
./mvnw test -pl nop-stream -am - checkstyle / 代码规范检查通过
-
ai-dev/logs/记录每个 Phase 的执行日志
Deferred But Adjudicated
Unaligned checkpoint
- Classification:
optimization candidate - Why Not Blocking Closure: aligned checkpoint 是 exactly-once 正确性的基线,unaligned 是性能优化。设计文档明确 "Aligned checkpoint 是基线能力"
- Successor Required:
yes - Successor Path: 待性能优化阶段
局部恢复(Region failover)
- Classification:
optimization candidate - Why Not Blocking Closure: 全局恢复是基线策略,设计文档 §5.2 "初始版本采用全局恢复和重新部署;局部恢复是后续优化"
- Successor Required:
yes - Successor Path: 待全局恢复稳定后优化
CREDIT_BASED / ACK_WINDOW 流控
- Classification:
optimization candidate - Why Not Blocking Closure: 消息队列(IMessageService)自带背压机制,不需要应用层流控。分布式场景下 BLOCKING_QUEUE + 消息队列背压已足够
- Successor Required:
yes - Successor Path: 待需要更高吞吐时实现
二进制序列化(Protobuf / Avro)
- Classification:
optimization candidate - Why Not Blocking Closure: JSON 序列化满足功能正确性要求。性能优化是后续工作
- Successor Required:
no - Successor Path: 待需要时可替换 StreamElementCodec
Rescale / state redistribution
- Classification:
optimization candidate - Why Not Blocking Closure: 并行度固定场景下不需要 rescale
- Successor Required:
yes - Successor Path: 待需要动态调整并行度时实现
Non-Blocking Follow-ups
- CEP operator 对接标准状态后端
- nop-stream-flow(XDSL 编排)
- nop-stream-flink(外部后端适配)
- 常驻 TaskManager daemon(当前按作业生命周期管理)
- Coordinator HA(当前 coordinator 是单点,利用 ClusterRegistry 的 coordinator 注册实现快速切换)
- 网络层性能优化(批量化发送、压缩、零拷贝)
Closure
Status Note: All 9 phases (Phase 0-8) completed. Independent closure audit verified all exit criteria met, all 15 design invariants verified, anti-hollow checks passed.
Closure Audit Evidence:
- Reviewer / Agent: Independent sub-agent (GLM-5.1, separate task)
- Evidence: Full source code audit confirmed all changes exist and are substantive. Distributed test suite passes.
Follow-up:
- CEP operator 对接标准状态后端
- nop-stream-flow(XDSL 编排)
- nop-stream-flink(外部后端适配)
- 常驻 TaskManager daemon(当前按作业生命周期管理)
- Coordinator HA(当前 coordinator 是单点,利用 ClusterRegistry 的 coordinator 注册实现快速切换)
- 网络层性能优化(批量化发送、压缩、零拷贝)