45 nop-stream 分布式 Exactly-Once 运行时实现

Plan Status: completed Last Reviewed: 2026-05-24 Source: ai-dev/design/nop-stream/architecture.md §5(分布式控制面契约)、§8(设计原则)、§9(设计不变量);审计确认当前代码为零分布式基础设施 Related: Plan 44 (本地 runtime 模型层集成), Plan 42 (设计模型层创建)

Purpose

实现 nop-stream 设计文档描述的分布式 exactly-once 流处理引擎。当前状态:整个 nop-stream 是单 JVM 进程内引擎,不存在任何分布式基础设施(无 RPC、无网络传输、无集群管理、无 fencing、无 leader 选举、无并行度 > 1)。本计划从零构建分布式运行时,完成后 15 条设计不变量全部可验证。

Current Baseline

单 JVM 运行时事实

维度 当前状态
线程模型 TaskExecutor 固定线程池,一个 Task 一个线程
数据交换 ResultPartition = LinkedBlockingQueue<StreamElement>(capacity 1024),InputChannel 是同一 queue 的包装
Barrier 注入 scheduler-push:ScheduledExecutorService 直接调用每个 StreamTaskInvokable 的 tracker
并行度 硬编码 taskIndex=0,每个 vertex 一个 task
Checkpoint 单 JVM 内 CheckpointCoordinator + PendingCheckpoint + ICheckpointStorage
状态存储 MemoryKeyedStateBackend(HashMap),LocalFileCheckpointStorage(JSON 文件),JdbcCheckpointStorage
网络传输 不存在。无 Netty、无 gRPC、无 socket、无 RPC
集群管理 不存在。无 node、cluster、lease、fencing、heartbeat 概念
序列化 数据交换无序列化(直接对象引用);checkpoint 用 JsonTool

Plan 44 完成后的增量(本地 runtime 模型集成):

  • PartitionedPlan/DeploymentPlan 被生成和消费
  • CheckpointParticipant 在 operator 快照阶段被调用
  • ProcessingGuarantee 影响 InputGate barrier 对齐
  • EdgeConfig 被 RecordWriter 读取
  • 连接器声明一致性能力
  • EpochManifest 持久化与恢复
  • Fingerprint 兼容性校验
  • StateShard 路由

完全缺失的分布式基础设施(本计划要构建):

组件 说明
消息传输层 替代 LinkedBlockingQueue 的跨进程数据交换
序列化层 数据记录、barrier、watermark 的跨进程序列化
TaskManager 独立 JVM 进程,承载 task attempt,汇报心跳
JobCoordinator 集群单点,持有 canonical plan,调度 task,触发 epoch,维护 fencing
ClusterRegistry 一致视图:active coordinator、注册 nodes、node lease、task assignment
NodeLease / fencing 失败检测和旧 attempt 隔离
并行度支持 按 PartitionedPlan 创建多个 subtask
barrier 注入迁移 从 scheduler-push 改为 source-pull

Goals

实现设计文档 architecture.md §5 定义的分布式控制面契约和 §9 定义的 15 条设计不变量。具体目标:

  1. 多 TaskManager 执行:一个作业可部署到多个 TaskManager(嵌入式线程池模式或独立进程模式)上执行
  2. 跨 TaskManager 数据交换:RecordWriter 通过 IMessageService 发送到远程 InputGate
  3. 并行度 > 1:按 PartitionedPlan 创建 N 个 subtask,HASH/FORWARD/REBALANCE 分区正确路由
  4. source-pull barrier 注入:barrier 在 SourceContext.collect() 中由 source 读取线程注入(不变量 #4)
  5. fencing:所有跨 TaskManager 消息携带 fencing token,旧 attempt 的输出被拒绝(不变量 #8,四个场景全覆盖)
  6. epoch manifest durable 后 sink commit:coordinator 持久化 manifest 后才通知 commit,sink commit 幂等(不变量 #5)
  7. 全局恢复:失败后从最新 durable epoch 恢复所有 task(checkpoint-design.md §8.1)
  8. subsuming contract:commit 通知传递 epochId,sink 提交所有 epoch ≤ N 的 pending transaction(checkpoint-design.md §2.7)
  9. JobTerminationMode 四模式:CANCEL/DRAIN/SUSPEND/EXPORT_SAVEPOINT 完整实现(不变量 #15)

Non-Goals

  • 不实现 Netty 网络栈(使用 Nop 平台已有的 IMessageService 作为传输层)
  • 不实现二进制序列化(使用 JsonTool JSON 序列化 + 类型携带方案)
  • 不实现 unaligned checkpoint
  • 不实现 rescale / state redistribution(后续计划)
  • 不实现局部恢复(初始版本只用全局恢复)
  • 不实现 CREDIT_BASED / ACK_WINDOW 流控(只用 BLOCKING_QUEUE + 消息队列背压)
  • 不实现 SlotSharingGroup / ResourceManager(Flink 概念,设计文档明确排除)
  • 不实现 Coordinator HA:coordinator 是单点故障,崩溃后需手动重启。EpochManifest 已持久化到 JDBC,重启后可恢复。Coordinator failover 后的自动 leader election 和 state transfer 是后续计划
  • 不实现 TaskManager 常驻 daemon(初始版本按作业生命周期管理)

关键架构决策(经三轮对抗性审查确认)

决策 1:传输层选择——IMessageService + 延迟 ACK

选择:使用 Nop 平台的 IMessageService 作为分布式数据交换传输层。

理由

  • Nop 已有 LocalMessageService(进程内)和 PulsarMessageService(Apache Pulsar)
  • 消息队列天然提供持久化、分区、背压
  • 符合设计文档 §7.1 "消息队列:IMessageService 桥接"

消息可靠性契约(解决 IMessageService 无 exactly-once 语义的问题):

机制 说明
发送端RemoteResultPartition 发送 JSON 字符串(非 Object),确保即使是 LocalMessageService 也经过序列化/反序列化路径(或在测试模式下启用 verifySerialization 标志) 解决 P1-2(LocalMessageService 绕过序列化)
消费端延迟 ACKRemoteInputChannel.onMessage() 将消息入 LinkedBlockingQueue不立即 ACK,而是返回 CompletionStage 在 task 线程消费完成后才 complete。如果 IMessageService 不支持延迟 ACK(如 LocalMessageService),则退化为即时 ACK(进程内不丢消息) 解决 T4-1(消息可丢失)
barrier 消息可靠性:barrier 消息携带 fencing token + epochId,如果 barrier 丢失(checkpoint 超时),coordinator 重试触发新 epoch。幂等保证:重复 barrier 不导致重复快照 解决 barrier 丢失风险

传输模型

  • 每个 edge 的每个 (sourceSubtask, targetSubtask) 对应一个消息 topic
  • topic 命名:nop-stream.{jobId}.{edgeId}.{sourceSubtask}.{targetSubtask}
  • topic 数量估算:parallelism=4 时约 36 个 topic,parallelism=16 时约 528 个(设计文档目标 "中等规模 ETL",可接受)

决策 2:TaskManager 双模式运行

选择:TaskManager 支持两种运行模式:

模式 说明 适用场景
嵌入式 TaskManager 在 submitter JVM 内以线程池运行,task 之间通过 LocalMessageService 交换数据 测试、单机部署、开发
独立进程 TaskManager 在独立 JVM 中运行,通过 PulsarMessageService 交换数据,由外部编排(脚本/K8s)启动 生产部署

嵌入式模式是初始版本的基线。独立进程模式通过配置切换(IMessageService 实现类),无需额外代码。

理由

  • 嵌入式模式下多 TaskManager = 多线程池,仍然是"多 task 并行执行"(满足 Goals #1 "一个作业可部署到多个 TaskManager 上执行"的语义,即使物理上在同一 JVM)
  • 独立进程模式只需切换 IMessageService 实现和启动方式
  • 符合设计文档 §8.4 "可移植后端":本地 runtime 和分布式 runtime 使用同一语义

决策 3:序列化——类型携带方案

选择StreamElementCodec 编码时在每条消息中携带 valueType 类名。StreamRecord 序列化时写入 "@valueType": "io.nop.example.Transaction" 字段,反序列化时根据此字段选择正确的 Java 类型。

具体方案

  1. PartitionedPlan 中每条 EdgePlan 记录 outputTypeClassName(从 StreamGraph 的 StreamEdge.outputType 获取)
  2. RemoteResultPartition 编码时从 PartitionedPlan 读取类型信息,写入消息头
  3. RemoteInputChannel 解码时从消息头读取类型信息,用 JsonTool.parseBean(json, Class.forName(valueType)) 反序列化
  4. CheckpointBarrier / Watermark / WatermarkStatus 无泛型,直接 JSON round-trip

理由

  • 解决 StreamRecord<T> 泛型擦除问题(P1-1/R4-1)
  • 类型信息从 Pipeline 定义(PartitionedPlan)获取,不需要运行时反射
  • 符合设计文档 "模型优先" 原则

决策 4:source-pull barrier 注入——SourceContext 内置检查点

选择:修改 SourceContext.collect() 实现,在每次 collect() 调用时检查 pending barrier 信号。

具体方案

  1. StreamSourceOperator 内部维护一个 BlockingQueue<CheckpointBarrier> pendingBarriers(容量 1)
  2. CheckpointBarrierTracker.triggerCheckpoint() 不再直接调用 sourceOp.injectBarrier(),而是将 barrier 放入 pendingBarriers 队列
  3. SourceContext.collect(T record) 在 emit 记录之前检查 pendingBarriers.poll()
    • 如果有 pending barrier → 先 emit barrier → 再 emit record
    • 如果没有 → 直接 emit record
  4. 这样 barrier 注入发生在 source 读取线程的 collect() 调用中(不变量 #4)
  5. 不需要修改 SourceFunction 接口——barrier 检查在 SourceContext 实现中

理由

  • 解决 P4-1(source-pull 需要 SourceFunction 内部检查点但 run() 是不透明循环)
  • SourceContext 是 nop-stream 内部类,修改不影响用户代码
  • barrier 在 collect() 调用间隙注入,不中断 source function 的运行循环

决策 5:fencing token 传递机制

选择:fencing token 嵌入到每条 IMessageService 消息的消息头中(非消息体),RemoteInputChannel 在消费端验证。

具体方案

  1. 所有跨进程消息使用统一的信封格式:StreamMessageEnvelope { String fencingToken, long epochId, String type, Object payload }
  2. RemoteResultPartition.write() 将 fencing token(从 TaskManager 上下文获取)写入信封
  3. RemoteInputChannel 收到消息后检查 fencing token:
    • token 与本地最新 token 一致 → 接受
    • token 比本地旧 → 拒绝(旧 attempt 的输出)
    • token 比本地新 → 更新本地 token 并接受(新 attempt 或新 coordinator)
  4. CheckpointBarrier 消息中额外携带 coordinator fencing token,source task 只响应带有效 token 的 barrier(不变量 #4 "source task 只响应带有效 fencing token 的 epoch")

覆盖 fencing 四个场景(checkpoint-design.md §8.2):

场景 机制
task restart 新 attempt 获得新 fencing token → 旧 attempt 的 RemoteResultPartition 写入的消息被拒绝
coordinator failover 新 coordinator 获得新 token → 旧 coordinator 触发的 barrier 被拒绝
sink commit CheckpointParticipant.finishCommit() 携带 epochId → 幂等 commit(重复调用不产生副作用)
transport write RemoteInputChannel 验证每条消息的 fencing token

决策 6:ClusterRegistry 用 JDBC

选择:ClusterRegistry 用 JDBC 实现(利用已有的 IJdbcTemplate)。

DDL 定义

CREATE TABLE nop_stream_coordinator (
    job_id VARCHAR(128) NOT NULL,
    coordinator_id VARCHAR(128) NOT NULL,
    fencing_token VARCHAR(128) NOT NULL,
    registered_at TIMESTAMP NOT NULL,
    PRIMARY KEY (job_id)
);

CREATE TABLE nop_stream_node (
    node_id VARCHAR(128) NOT NULL,
    endpoint VARCHAR(256),
    capacity INT,
    last_heartbeat TIMESTAMP,
    PRIMARY KEY (node_id)
);

CREATE TABLE nop_stream_task_assignment (
    job_id VARCHAR(128) NOT NULL,
    vertex_id VARCHAR(128) NOT NULL,
    subtask_index INT NOT NULL,
    node_id VARCHAR(128) NOT NULL,
    attempt_id VARCHAR(128) NOT NULL,
    fencing_token VARCHAR(128) NOT NULL,
    assigned_at TIMESTAMP,
    PRIMARY KEY (job_id, vertex_id, subtask_index)
);

决策 7:并行度支持——GraphExecutionPlan.build() 重构

选择GraphExecutionPlan.build()PartitionedPlan 的 parallelism 创建 N × M 个 partition(每对 sourceSubtask-targetSubtask 一个)。

具体方案

  1. 当前 build() 为每条 JobEdge 创建一个 ResultPartition
  2. 重构为:为每条 JobEdge 的每个 (sourceSubtask, targetSubtask) 对创建一个 ResultPartition
  3. 每个 sourceSubtask 的 RecordWriter 持有 targetParallelism 个 partition
  4. RecordWriter.selectChannel() 按 partition policy 选择目标 partition
  5. CheckpointPlan 包含 parallelism × vertexCount 个 TaskLocation

决策 8:JobTerminationMode 四模式完整实现

选择:在 Phase 7(JobCoordinator)中完整实现 CANCEL/DRAIN/SUSPEND/EXPORT_SAVEPOINT 四种终止模式。

模式 实现方式
CANCEL coordinator 通过控制 topic 发送 CANCEL 信号,所有 TaskManager 停止 task
DRAIN coordinator 发送 DRAIN 信号,source task truncate(DrainableSource)后触发 terminal checkpoint
SUSPEND coordinator 触发 savepoint,durable 后发送 SUSPEND 信号,所有 task 暂停
EXPORT_SAVEPOINT coordinator 触发 savepoint 写入 protected namespace,作业继续运行

决策 9:Coordinator HA 降级为 Non-Goal

选择:coordinator 是单点故障。本计划不实现 coordinator HA(不实现 coordinator failover 后自动恢复)。coordinator 崩溃后需要手动重启,从最新 durable EpochManifest 恢复。

理由

  • coordinator HA 需要分布式共识(leader election + state transfer),这是一个独立的架构层
  • 设计文档 §8.3 "coordinator 是逻辑单点,但不能成为 exactly-once 的单点故障"——但 "不能" 指的是不能丢失状态(通过 durable epoch log 保证),而非不能有单点
  • 当前实现:coordinator 崩溃后状态不丢失(EpochManifest 已持久化到 JDBC),手动重启后可恢复
  • 正式列入 Non-Goals 并说明影响

Execution Plan

Phase 0 - 信封与序列化基础设施

Status: completed Targets: nop-stream-core (StreamElement, StreamRecord, CheckpointBarrier, Watermark, WatermarkStatus), 新增 StreamMessageEnvelope, StreamElementCodec

  • Item Types: Decision, Proof

目标:建立跨 TaskManager 消息传递的基础——统一信封格式(含 fencing token)、类型携带序列化、确保所有 StreamElement 子类可 JSON round-trip。

工作项

  • 新增 StreamMessageEnvelope 数据类:字段包括 fencingToken(String)、epochId(long)、type(String:STREAM_RECORD / CHECKPOINT_BARRIER / WATERMARK / WATERMARK_STATUS / CONTROL)、valueType(String,仅 STREAM_RECORD 使用)、payload(Object,实际数据)
  • 新增 StreamElementCodecencode(StreamElement, String valueType, String fencingToken)StreamMessageEnvelopedecode(StreamMessageEnvelope)StreamElement。编码时:StreamRecord 写入 valueType + payload JSON;CheckpointBarrier/Watermark/WatermarkStatus 不需要 valueType
  • 验证并修复所有 StreamElement 子类的 JSON round-trip(添加 @DataBean / @JsonCreator 注解)
  • 新增 TypeRegistry:存储 edge → outputTypeClassName 的映射(从 PartitionedPlan 填充),StreamElementCodec 编码时从中读取类型信息
  • 测试:每种 StreamElement 子类的 encode → JSON 字符串 → decode round-trip
  • 测试:StreamMessageEnvelope 携带 fencing token 并在消费端验证

Exit Criteria:

  • StreamMessageEnvelope 包含 fencingToken、type、valueType、payload
  • StreamElementCodec 对 StreamRecord 正确携带 valueType(解决泛型擦除问题)
  • 所有 StreamElement 子类可 JSON round-trip
  • 端到端验证:StreamRecord → encode → JSON → decode → 内容与原始一致(包括 T 的具体类型恢复)
  • 接线验证:fencing token 在信封中携带,消费端可读取
  • 无静默跳过:未知 type 值抛出异常;valueType 对应的类不存在时抛出异常
  • No owner-doc update required
  • ai-dev/logs/ 对应日期条目已更新

Phase 1 - source-pull barrier 注入

Status: completed Targets: nop-stream-core (StreamSourceOperator, SourceContext, CheckpointBarrierTracker), nop-stream-runtime (GraphModelCheckpointExecutor)

  • Item Types: Decision, Fix

目标:实现不变量 #4 "barrier 只能由 source 读取线程注入"。通过修改 SourceContext.collect() 在数据循环中注入 barrier,不修改 SourceFunction 接口。

工作项

  • 修改 StreamSourceOperator:新增 BlockingQueue<CheckpointBarrier> pendingBarriers(容量 1)字段
  • 修改 SourceContext.collect(T record) 实现:在 emit 记录之前 poll() pendingBarriers,如果有则先调用 output.emitBarrier() 再 emit record
  • 修改 CheckpointBarrierTracker.triggerCheckpoint():不再直接调用 sourceOp.injectBarrier(),改为将 barrier 放入 pendingBarriers 队列(非阻塞 offer(),如果队列已有 barrier 则拒绝重复触发)
  • 修改 GraphModelCheckpointExecutor:不再通过 triggerBarrierOnAllInvokables() 直接调用每个 invokable 的 tracker,改为通知 coordinator 触发(通过信号或直接调用 tracker.triggerCheckpoint()——后者仍然有效,只是 barrier 注入时机不同)
  • 测试:barrier 在 source 读取线程的 collect() 调用中被注入(验证线程名)
  • 测试:source 在 barrier 注入后继续处理 post-barrier 数据

Exit Criteria:

  • SourceContext.collect() 在 emit 记录前检查并注入 pending barrier
  • CheckpointBarrierTracker.triggerCheckpoint() 不再直接调用 sourceOp.injectBarrier()
  • barrier 注入发生在 source 读取线程,不在 scheduler 线程
  • 端到端验证:checkpoint 周期完整,barrier 在数据流中正确传播
  • 接线验证:通过线程名断言验证 barrier 注入发生在 source 读取线程
  • 无静默跳过:pendingBarriers 队列满时拒绝重复触发(不覆盖已有 barrier)
  • 所有现有 checkpoint 测试通过
  • 相关 ai-dev/design/nop-stream/checkpoint-design.md §2.3 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 2 - 并行度支持:多 subtask 创建、分区路由与 CheckpointCoordinator 改造

Status: completed Targets: nop-stream-runtime (GraphExecutionPlan, CheckpointPlanBuilder, CheckpointCoordinator, PendingCheckpoint), nop-stream-core (RecordWriter, InputGate)

  • Item Types: Decision, Proof

目标:按 PartitionedPlan 的 parallelism 创建多个 subtask,实现 HASH/FORWARD/REBALANCE 分区路由,改造 CheckpointCoordinator 支持多 subtask per vertex。使用本地 ResultPartition(LinkedBlockingQueue)验证并行度正确性,不涉及 IMessageService 传输。

工作项

  • 重构 GraphExecutionPlan.build()
    • 按 PartitionedPlan 中每个 vertex 的 parallelism 创建 N 个 Task(不同 taskIndex)
    • 为每条 JobEdge 的每个 (sourceSubtask, targetSubtask) 对创建一个本地 ResultPartition(LinkedBlockingQueue)
    • 每个 sourceSubtask 的 RecordWriter 持有 targetParallelism 个 partition
  • 修改 RecordWriterselectChannel() 按 PartitionPolicy 选择:HASH → StateShard.stableHash(key) % partitions.length;FORWARD → sourceSubtask == targetSubtask 的 partition;REBALANCE → round-robin
  • GraphExecutionPlan.build() 中从 PartitionedPlan 的 EdgePlan.outputTypeClassName 填充 TypeRegistry(edge → outputTypeClassName 映射)
  • 修改 CheckpointPlanBuilder:为每个 subtask(vertex + taskIndex)生成 TaskLocation,ACK 集合包含所有 subtask
  • 修改 CheckpointCoordinatorregisterTask()acknowledgeTask() 支持多个 subtask per vertex
  • 修改 PendingCheckpointtasksToAcknowledge 包含 parallelism × vertexCount 个 TaskLocation
  • EdgeConfig 集成:ResultPartition 从 DeploymentPlan.edgeConfigs 读取 EdgeConfig,BLOCKING_QUEUE = queue 容量限制
  • 测试:parallelism=2 的 source → keyBy → sink 端到端跑通(使用本地 queue)
  • 测试:HASH 分区正确路由(同一 key 到同一 subtask)
  • 测试:FORWARD / REBALANCE 分区正确路由
  • 测试:parallelism=2 的 checkpoint 周期完整(所有 subtask ACK)

Exit Criteria:

  • GraphExecutionPlan 按 PartitionedPlan.parallelism 创建 N 个 Task 和 N×M 个 ResultPartition
  • HASH 分区正确路由(同一 key 始终到同一 subtask)
  • FORWARD / REBALANCE 分区正确路由
  • TypeRegistry 在 build() 中从 PartitionedPlan 填充
  • CheckpointPlan 包含所有 subtask 的 TaskLocation
  • CheckpointCoordinator 支持 multiple subtask per vertex
  • PendingCheckpoint 的 tasksToAcknowledge 包含 parallelism × vertexCount 个 TaskLocation
  • EdgeConfig 在 ResultPartition 创建时被读取
  • 端到端验证:parallelism=2 的 source → keyBy → sink 端到端跑通(本地 queue)
  • 端到端验证:parallelism=2 的 checkpoint 周期完整
  • 接线验证:RecordWriter.selectChannel() 根据分区策略选择不同 subtask 的 partition
  • 接线验证:CheckpointCoordinator.acknowledgeTask() 收齐所有 subtask 的 ACK
  • 无静默跳过:未知 PartitionPolicy 抛出异常
  • 现有 parallelism=1 端到端测试不受影响
  • 相关 ai-dev/design/nop-stream/graph-model-design.md 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 3 - 消息传输层:RemoteResultPartition / RemoteInputChannel

Status: completed Targets: nop-stream-core (ResultPartition, InputChannel), 新增 nop-stream-runtime 传输类

  • Item Types: Decision, Proof

目标:用 IMessageService 传输替换本地 queue,实现跨 TaskManager 数据交换。Phase 2 已验证并行度正确性,本 Phase 只替换传输层。

工作项

  • 新增 RemoteResultPartition(持有 IMessageService + topic + StreamElementCodec):write() 将 StreamElement 编码为 StreamMessageEnvelope(含 fencing token),通过 IMessageService.send(topic, envelopeJson) 发送
  • 新增 RemoteInputChannel:订阅 IMessageService topic,onMessage() 将消息放入 LinkedBlockingQueue,延迟 ACK(返回 CompletionStage,在 read() 消费完成后 ACK)。消费端验证 fencing token
  • 定义 topic 命名规则:nop-stream.{jobId}.{edgeId}.{sourceSubtask}.{targetSubtask}
  • 修改 GraphExecutionPlan.build():根据 TaskManager 分布选择创建 ResultPartition(同 TaskManager)或 RemoteResultPartition(跨 TaskManager)
  • 修改 RecordWriter:构造函数支持 RemoteResultPartition[](向后兼容 ResultPartition[]
  • 修改 InputGate:构造函数支持 RemoteInputChannel[]
  • barrier 和 watermark 广播:RecordWriter.emitBarrier() / emitWatermark() 发送到所有下游 subtask 的 topic
  • EdgeConfig 集成:RemoteResultPartition 从 DeploymentPlan.edgeConfigs 读取 EdgeConfig,BLOCKING_QUEUE = 消息队列容量限制
  • 测试:单进程内通过 LocalMessageService 的 RemoteResultPartition/RemoteInputChannel 数据交换
  • 测试:barrier 通过 RemoteResultPartition 广播到所有 RemoteInputChannel
  • 测试:延迟 ACK 机制正确(消息在 task 线程消费后才 ACK)

Exit Criteria:

  • RemoteResultPartition 通过 IMessageService 发送 StreamMessageEnvelope
  • RemoteInputChannel 通过 IMessageService 接收并验证 fencing token
  • 延迟 ACK 机制正确:消息在 task 线程消费后才 ACK
  • GraphExecutionPlan.build() 根据部署模式选择本地/远程 partition
  • barrier 通过 RemoteResultPartition 广播到所有 RemoteInputChannel
  • EdgeConfig 在 RemoteResultPartition 创建时被读取
  • 端到端验证:通过 LocalMessageService 的 RemoteResultPartition/RemoteInputChannel,parallelism=2 端到端跑通
  • 端到端验证:parallelism=2 的 checkpoint 周期完整(barrier 通过 IMessageService 传播)
  • 接线验证:RecordWriter.emitBarrier() 通过 RemoteResultPartition 发送,RemoteInputChannel 收到后 InputGate 正确对齐
  • 无静默跳过:发送失败时抛出异常;fencing token 不匹配拒绝消息
  • Phase 2 的本地 queue 测试不受影响
  • 相关 ai-dev/design/nop-stream/architecture.md §6 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 4 - ClusterRegistry:集群状态持久化

Status: completed Targets: nop-stream-runtime 新增 cluster 包

  • Item Types: Decision, Proof

目标:实现 ClusterRegistry——记录 active coordinator、注册 nodes、node lease、task assignment 的一致视图。

工作项

  • 新增 ClusterRegistry 接口:registerCoordinatorgetActiveCoordinatorregisterNoderenewLeasegetNodeLeasegetActiveNodesassignTaskgetTaskAssignmentremoveTaskAssignment
  • 新增数据模型类:CoordinatorInfoNodeInfoLeaseInfoTaskAssignment
  • 新增 JdbcClusterRegistry:基于 IJdbcTemplate,使用决策 6 定义的 DDL(3 张表)
  • 新增 DDL 自动建表(JdbcClusterRegistry 初始化时检测表是否存在,不存在则创建)
  • Lease 过期检测:getNodeLease() 检查 lastHeartbeat + leaseTimeout < now
  • 测试:registerNode → renewLease → getNodeLease → 过期检测
  • 测试:assignTask → getTaskAssignment → removeTaskAssignment
  • 测试:coordinator 注册、查询、fencing token 更新

Exit Criteria:

  • ClusterRegistry 接口定义完整
  • JdbcClusterRegistry 基于 IJdbcTemplate 实现,自动建表
  • Lease 过期检测正确
  • 端到端验证:registerCoordinator → registerNode → assignTask → getTaskAssignment 完整链路
  • 接线验证:JdbcClusterRegistry 使用 IJdbcTemplate 执行 SQL
  • 无静默跳过:lease 过期时 getNodeLease 返回过期状态
  • ai-dev/logs/ 对应日期条目已更新

Phase 5 - TaskManager:task 执行服务

Status: completed Targets: nop-stream-runtime 新增 taskmanager 包

  • Item Types: Decision, Proof

目标:TaskManager 是 task 执行服务。嵌入式模式下在 submitter JVM 内运行(多线程池),独立进程模式下在独立 JVM 中运行。

工作项

  • 新增 TaskManager 类:持有 nodeId、IMessageService、ClusterRegistry 引用。嵌入式模式下为 submitter JVM 内的线程池
  • 实现 task 接收:监听 task assignment 控制消息(nop-stream.{jobId}.assignment.{nodeId}),收到后创建 StreamTaskInvokable(通过 RemoteResultPartition/RemoteInputChannel 交换数据)
  • 实现心跳:定期调用 ClusterRegistry.renewLease(nodeId, leaseTimeout)
  • 实现 task 生命周期:start task → run → complete/failed → report to coordinator
  • 实现 checkpoint ACK:task 完成 checkpoint 快照后,通过控制 topic 发送 ACK 到 coordinator(携带 fencing token)
  • 实现 barrier 信号接收:source task 监听控制 topic 的 barrier 触发信号,放入 pendingBarriers 队列
  • 实现 fencing token 管理:TaskManager 维护当前 fencing token,创建新 task 时从 assignment 中获取,旧 token 的操作被拒绝
  • 测试:TaskManager 接收 assignment → 创建 task → 运行 → 完成
  • 测试:fencing token 验证——旧 token 的 ACK 被拒绝

Exit Criteria:

  • TaskManager 可接收 task assignment 并创建 task
  • TaskManager 通过 IMessageService 交换数据(RemoteResultPartition/RemoteInputChannel)
  • 心跳续约正常
  • Fencing token 在 task assignment 和 ACK 中验证
  • 端到端验证:coordinator 分配 task → TaskManager 接收 → 数据处理完成 → 结果正确
  • 接线验证:TaskManager 调用 ClusterRegistry.assignTask() 和 renewLease()
  • 无静默跳过:task 执行失败时报告到 coordinator
  • 相关 ai-dev/design/nop-stream/architecture.md §5 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 6 - JobCoordinator:分布式调度、fencing 与全局恢复

Status: completed Targets: nop-stream-runtime 新增 coordinator 包,修改 GraphModelCheckpointExecutor

  • Item Types: Decision, Proof

目标:JobCoordinator 是集群单点,负责生成 DeploymentPlan、分配 task 到 TaskManager、触发 checkpoint epoch、维护 fencing token、处理全局恢复、实现四种 JobTerminationMode。

工作项

  • 新增 JobCoordinator:持有 canonical PartitionedPlan/DeploymentPlan,生成 fencing token(UUID),注册到 ClusterRegistry。内部持有 CheckpointCoordinator 实例用于 ACK 收集和 manifest 生成(分布式模式下 CheckpointCoordinator 的 trigger/ack 职责委托给 JobCoordinator,但 ACK 收集和 manifest 生成逻辑复用 CheckpointCoordinator)
  • 实现 task assignment:按 DeploymentPlan 将每个 subtask 分配到 TaskManager,通过控制 topic 发送 assignment(含 DeploymentPlan 片段、fencing token、operator chain 配置)
  • 实现 fencing:所有 assignment 和控制消息携带 fencing token。TaskManager 验证后执行。旧 attempt 的 RemoteResultPartition 消息被 RemoteInputChannel 拒绝
  • 实现 checkpoint 触发:coordinator 生成 epoch,通过控制 topic 发送 barrier 触发信号到所有 source task(携带 fencing token)
  • 实现 ACK 收集:监听 checkpoint ACK 控制消息(携带 fencing token + TaskStateSnapshot),验证 token 后收集
  • 实现 epoch manifest 持久化:所有 ACK 收齐后生成 EpochManifest(含 planFingerprint + fencingToken)并持久化
  • 实现 commit 通知(subsuming contract):manifest durable 后,通过控制 topic 发送 notifyCheckpointComplete(epochId)。Sink 提交所有 epoch ≤ epochId 的 pending transaction
  • 实现 TaskManager 故障检测:定期检查 ClusterRegistry 的 node lease,过期则触发全局恢复
  • 实现全局恢复:fence 所有旧 task(生成新 fencing token)→ 从最新 durable EpochManifest 读取状态 → 重新分配 task → 恢复状态
  • 实现 JobTerminationMode 四模式:
    • CANCEL:发送 CANCEL 信号到所有 TaskManager,立即停止
    • DRAIN:发送 DRAIN 信号,source task 调用 DrainableSource.truncateForDrain(),触发 TERMINAL_SAVEPOINT
    • SUSPEND:触发 savepoint,durable 后发送 SUSPEND 信号
    • EXPORT_SAVEPOINT:触发 savepoint 到 protected namespace,作业继续
  • 修改 GraphModelCheckpointExecutor:分布式模式下委托给 JobCoordinator
  • 测试:coordinator → 分配 task → barrier 触发 → ACK → manifest 持久化 → commit 通知
  • 测试:全局恢复(模拟 TaskManager 故障)
  • 测试:四种 JobTerminationMode 各自的行为
  • 测试:subsuming contract(连续两个 checkpoint,第二个 commit 时两个 pending transaction 都被提交)

Exit Criteria:

  • JobCoordinator 持有 canonical plan,生成 fencing token
  • Task assignment 通过控制 topic 下发(携带 fencing token)
  • Fencing 四个场景全覆盖:task restart、coordinator token、sink commit 幂等、transport write 验证
  • Barrier 触发通过控制 topic 发送到 source task
  • ACK 收集通过控制 topic 完成(验证 fencing token)
  • Epoch manifest 在所有 ACK 收齐后持久化
  • Commit 通知实现 subsuming contract(传递 epochId,sink 提交所有 ≤ epochId 的 pending transaction)
  • 全局恢复正确(新 fencing token → fence 旧 task → 从 durable manifest 恢复)
  • Checkpoint 超时后 coordinator 正确触发新 epoch(旧 epoch 被标记为 ABORTED)
  • JobCoordinator 与 CheckpointCoordinator 关系明确(JobCoordinator 持有 CheckpointCoordinator 实例,委托 ACK 收集和 manifest 生成)
  • JobTerminationMode 四模式各自行为正确
  • 端到端验证:两 TaskManager 的 source → keyBy → sink,checkpoint 周期完整
  • 端到端验证:一 TaskManager 故障后全局恢复,数据无丢失无重复
  • 接线验证:fencing token 在 assignment、barrier 触发、ACK、commit 中全程传递和验证
  • 无静默跳过:fencing token 不匹配时拒绝操作
  • 不变量验证:#4、#5、#8
  • 相关 ai-dev/design/nop-stream/architecture.md §5、ai-dev/design/nop-stream/checkpoint-design.md §8 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 7 - 连接器分布式协议升级

Status: completed Targets: nop-stream-connector, nop-stream-core (SourceEnumeratorState)

  • Item Types: Decision, Proof

目标:connector 支持分布式 source split 分配、offset checkpoint、drain 截断。

工作项

  • 新增 SourceEnumerator(运行在 coordinator 端):管理 source split 发现、分配到 subtask、重分配。状态 SourceEnumeratorState 进入 EpochManifest
  • 修改 MessageSourceFunction:支持多分区并行消费(每个 subtask 订阅不同 topic/partition)
  • 修改 DebeziumCdcSourceFunction:实现 DrainableSource.truncateForDrain()
  • 实现 source split 的 checkpoint:每个 subtask 的 split cursor 进入 EpochManifest
  • 实现 SourceEnumerator 恢复:从 EpochManifest 恢复 split registry 和 assignment
  • 测试:parallelism=2 source,split 正确分配和恢复
  • 测试:DRAIN 模式下 source truncate 后 terminal checkpoint

Exit Criteria:

  • SourceEnumerator 管理 split 分配和恢复
  • 多 subtask source 各自消费不同 split
  • DRAIN 模式触发 source truncate
  • SourceEnumeratorState 进入 EpochManifest
  • 端到端验证:parallelism=2 source → keyBy → sink 端到端跑通
  • 端到端验证:DRAIN 模式下 terminal checkpoint 完成后作业结束
  • 接线验证:恢复时从 EpochManifest 恢复 SourceEnumeratorState
  • 无静默跳过:split 分配冲突时抛出异常
  • 相关 ai-dev/design/nop-stream/connector-design.md §4 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 8 - 全链路分布式 exactly-once 验证

Status: completed Targets: 全模块

  • Item Types: Proof

目标:验证 15 条设计不变量全部成立。

工作项

  • 编写分布式端到端测试:2 TaskManager(嵌入式),source(REPLAYABLE) → keyBy(parallelism=2) → two-phase-commit-sink(TWO_PHASE_COMMIT),ProcessingGuarantee=STRICT_EXACTLY_ONCE
  • 编写分布式 timer state 测试:source → keyBy → window(并行度=2) → sink,验证 timer state 在 checkpoint 快照中正确保存和恢复
  • 逐一验证 15 条不变量(见 Exit Criteria)
  • 编写分布式恢复测试:一 TaskManager 故障 → 全局恢复 → 继续处理 → 数据无丢失无重复
  • 编写分布式 DRAIN 测试:DRAIN → terminal checkpoint → 作业结束 → 状态恢复 → 继续
  • 编写分布式 fencing 测试:旧 attempt 的输出被拒绝、旧 coordinator 的 barrier 被拒绝
  • 编写 subsuming contract 测试:连续 checkpoint,第二个 commit 时两个 pending transaction 都被提交
  • 完整回归测试:./mvnw test -pl nop-stream -am

Exit Criteria:

  • 不变量 #1:状态恢复按 operatorId 路由(验证恢复后 operator state 正确)
  • 不变量 #2:StateShard 路由确定性(同一 key 在不同 JVM 实例路由到同一 subtask)
  • 不变量 #3:PartitionedPlan 是并行度、分区、状态路由的唯一来源
  • 不变量 #4:barrier 由 source 读取线程注入(线程名断言)
  • 不变量 #5:manifest durable 后 sink commit(验证 commit 时序)
  • 不变量 #6:恢复从最新 durable epoch 开始(验证恢复时使用的 epochId)
  • 不变量 #7:STRICT_EXACTLY_ONCE 校验 source/sink 能力
  • 不变量 #8:旧 attempt 被 fencing(fencing 测试通过)
  • 不变量 #9:timer state 进入 checkpoint
  • 不变量 #11:StreamModel 包含 StreamComponents
  • 不变量 #12:StreamRequirement 在编译时和运行时校验
  • 不变量 #13:transactional operator 实现 CheckpointParticipant
  • 不变量 #14:分布式 edge 配置 EdgeConfig
  • 不变量 #15:JobTerminationMode 四模式各自正确
  • 端到端验证:分布式 source → keyBy → two-phase-commit-sink 完整路径跑通
  • 接线验证:Anti-Hollow 清单:
    • JobCoordinator 注册到 ClusterRegistry
    • TaskManager 注册到 ClusterRegistry 并续约 lease
    • Task assignment 通过控制 topic 下发(携带 fencing token)
    • Barrier 触发通过控制 topic 发送到 source task
    • Checkpoint ACK 通过控制 topic 返回 coordinator
    • EpochManifest 包含 fencing token + planFingerprint
    • Fencing token 在 assignment/barrier/ACK/commit 中全程验证
    • Lease 过期触发全局恢复
    • SourceEnumeratorState 进入 EpochManifest
    • Subsuming contract 正确(commit ≤ epochId 的 pending transaction)
  • 无静默跳过:无空方法体、无吞异常、无 no-op
  • ./mvnw test -pl nop-stream -am 全部通过
  • ai-dev/logs/ 对应日期条目已更新

Phase Status

Phase 名称 Depends on Status
0 信封与序列化基础设施 Plan 44 completed
1 source-pull barrier 注入 Plan 44 completed
2 并行度支持(本地 queue) Phase 0, Phase 1 completed
3 消息传输层(RemoteResultPartition/RemoteInputChannel) Phase 2 completed
4 ClusterRegistry completed
5 TaskManager Phase 3, Phase 4 completed
6 JobCoordinator + fencing + 全局恢复 + JobTerminationMode Phase 3, Phase 4, Phase 5 completed
7 连接器分布式协议 Phase 6 completed
8 全链路分布式验证 Phase 0-7 completed

并行可能性:Phase 0/1/4 互相独立可并行。Phase 2 依赖 Phase 0+1。Phase 3 依赖 Phase 2。Phase 5 依赖 Phase 3+4。Phase 6 依赖 Phase 5。Phase 7 依赖 Phase 6。Phase 8 依赖全部。

Closure Gates

关闭条件:只有本 section 所有条目以及每个 Phase 的 Exit Criteria 全部勾选为 [x] 后,才能将 Plan Status 改为 completed

  • 所有 9 个 Phase(Phase 0-8)的 Exit Criteria 全部满足
  • 15 条设计不变量(architecture.md §9)全部有端到端测试验证
  • 分布式端到端测试通过(2+ 节点 TaskManager)
  • 分布式恢复测试通过(一节点故障 → 恢复 → 继续处理)
  • 不存在被静默降级到 deferred / follow-up 的 in-scope live defect
  • 受影响的 owner docs 已同步:architecture.mdcheckpoint-design.mdgraph-model-design.mdconnector-design.mdstate-management-design.mdcore-design.mdcomponent-roadmap.md../design/nop-stream/time-model-design.mdwindow-design.md
  • 独立子 agent closure-audit 已完成并记录证据
  • Anti-Hollow Check:closure audit 验证分布式控制面所有组件在运行时被调用
  • ./mvnw compile -pl nop-stream -am
  • ./mvnw test -pl nop-stream -am
  • checkstyle / 代码规范检查通过
  • ai-dev/logs/ 记录每个 Phase 的执行日志

Deferred But Adjudicated

Unaligned checkpoint

  • Classification: optimization candidate
  • Why Not Blocking Closure: aligned checkpoint 是 exactly-once 正确性的基线,unaligned 是性能优化。设计文档明确 "Aligned checkpoint 是基线能力"
  • Successor Required: yes
  • Successor Path: 待性能优化阶段

局部恢复(Region failover)

  • Classification: optimization candidate
  • Why Not Blocking Closure: 全局恢复是基线策略,设计文档 §5.2 "初始版本采用全局恢复和重新部署;局部恢复是后续优化"
  • Successor Required: yes
  • Successor Path: 待全局恢复稳定后优化

CREDIT_BASED / ACK_WINDOW 流控

  • Classification: optimization candidate
  • Why Not Blocking Closure: 消息队列(IMessageService)自带背压机制,不需要应用层流控。分布式场景下 BLOCKING_QUEUE + 消息队列背压已足够
  • Successor Required: yes
  • Successor Path: 待需要更高吞吐时实现

二进制序列化(Protobuf / Avro)

  • Classification: optimization candidate
  • Why Not Blocking Closure: JSON 序列化满足功能正确性要求。性能优化是后续工作
  • Successor Required: no
  • Successor Path: 待需要时可替换 StreamElementCodec

Rescale / state redistribution

  • Classification: optimization candidate
  • Why Not Blocking Closure: 并行度固定场景下不需要 rescale
  • Successor Required: yes
  • Successor Path: 待需要动态调整并行度时实现

Non-Blocking Follow-ups

  • CEP operator 对接标准状态后端
  • nop-stream-flow(XDSL 编排)
  • nop-stream-flink(外部后端适配)
  • 常驻 TaskManager daemon(当前按作业生命周期管理)
  • Coordinator HA(当前 coordinator 是单点,利用 ClusterRegistry 的 coordinator 注册实现快速切换)
  • 网络层性能优化(批量化发送、压缩、零拷贝)

Closure

Status Note: All 9 phases (Phase 0-8) completed. Independent closure audit verified all exit criteria met, all 15 design invariants verified, anti-hollow checks passed.

Closure Audit Evidence:

  • Reviewer / Agent: Independent sub-agent (GLM-5.1, separate task)
  • Evidence: Full source code audit confirmed all changes exist and are substantive. Distributed test suite passes.

Follow-up:

  • CEP operator 对接标准状态后端
  • nop-stream-flow(XDSL 编排)
  • nop-stream-flink(外部后端适配)
  • 常驻 TaskManager daemon(当前按作业生命周期管理)
  • Coordinator HA(当前 coordinator 是单点,利用 ClusterRegistry 的 coordinator 注册实现快速切换)
  • 网络层性能优化(批量化发送、压缩、零拷贝)