44 nop-stream 分布式设计模型运行时集成

Plan Status: completed Last Reviewed: 2026-05-24 Source: Plan 42 模型层实现审计发现——35 个模型类已创建但运行时零集成;结合 ai-dev/design/nop-stream/ 全量设计文档分析 Related: Plan 42 (design-implementation, superseded by this plan)

Purpose

将 Plan 42 已创建但未接入运行时的 35 个模型类完整集成到 nop-stream 执行管线和 checkpoint 子系统中。当前状态:类型已存在,运行时行为为零。本计划完成后,从 execute() 到 checkpoint 恢复的完整路径上,每个新模型类都必须被运行时代码实际调用。

Current Baseline

Plan 42 已创建的模型类(35 个 + 9 枚举):全部存在且编译通过,869 个现有测试通过。

运行时集成审计结果(全部为 NO)

审计项 状态
CheckpointCoordinator 调用 CheckpointParticipant NO
GraphModelCheckpointExecutor 使用 EpochManifest / ProcessingGuarantee / CheckpointParticipant NO(全部 NO)
RecordWriter 读取 EdgeConfig NO
InputGate 读取 EdgeConfig 或 ProcessingGuarantee NO
StreamExecutionEnvironment.execute() 调用 PartitionedPlanGenerator / DeploymentPlanGenerator NO
CheckpointPlanBuilder 读取 StreamComponents.checkpointParticipants NO
WindowOperator 引用 WindowingStrategy / StreamComponents NO
CheckpointBarrierTracker 根据 ProcessingGuarantee 调整 barrier 行为 NO
任何 connector 声明 SourceConsistencyCapability / SinkConsistencyCapability NO
MemoryKeyedStateBackend 使用 StateShard 路由 NO
TaskEpochSnapshot 在 runtime 中被使用 NO(dead code)

已有且正常工作的运行时路径

  • execute() → StreamGraphGenerator → JobGraphGenerator → GraphExecutionPlan → TaskExecutor
  • CheckpointCoordinator + CheckpointBarrierTracker + CheckpointPlanBuilder + PendingCheckpoint
  • Source → ChainingOutput → Operator → Sink(单链/多链管线)
  • Barrier 注入(scheduler 线程触发)→ 传播 → ACK → 持久化 → 恢复
  • TwoPhaseCommitSinkFunction 的 preCommit/commit/rollback(不通过 CheckpointParticipant 接口)
  • RecordWriter 按 IPartitioner 分区(无 EdgeConfig)
  • InputGate barrier 对齐(硬编码"等待所有 channel",无 ProcessingGuarantee 分支)

关键现有类及其位置

模块
StreamExecutionEnvironment core io.nop.stream.core.environment
CheckpointCoordinator runtime io.nop.stream.runtime.checkpoint
GraphModelCheckpointExecutor runtime io.nop.stream.runtime.execution
CheckpointBarrierTracker core io.nop.stream.core.execution
CheckpointPlanBuilder runtime io.nop.stream.runtime.checkpoint
RecordWriter core io.nop.stream.core.execution
InputGate core io.nop.stream.core.execution
GraphExecutionPlan runtime io.nop.stream.runtime.execution
StreamTaskInvokable runtime io.nop.stream.runtime.execution
MemoryKeyedStateBackend core io.nop.stream.core.common.state.backend.memory
WindowOperator runtime io.nop.stream.runtime.operators.windowing
JobGraphGenerator core io.nop.stream.core.graph
StreamGraphGenerator core io.nop.stream.core.graph
PartitionedPlanGenerator core io.nop.stream.core.graph
DeploymentPlanGenerator runtime io.nop.stream.runtime.execution

Goals

  1. 执行管线扩展execute() 调用 PartitionedPlanGenerator → DeploymentPlanGenerator,生成的 DeploymentPlan 被 GraphExecutionPlan/TaskExecutor 消费
  2. CheckpointParticipant 集成:算子快照阶段调用 participant.saveState(),checkpoint 完成时 coordinator 调用 participant.finishCommit(true)。TwoPhaseCommitSinkFunction 通过 CheckpointParticipant 接口参与
  3. ProcessingGuarantee 生效:InputGate 根据 ProcessingGuarantee 决定 barrier 对齐行为(STRICT 阻塞已收到 barrier 的 channel vs AT_LEAST_ONCE 不阻塞)
  4. EdgeConfig 流控:RecordWriter 根据 EdgeConfig.flowControlPolicy 选择流控行为
  5. 连接器能力声明:5 个现有 connector 声明一致性能力,参与 StreamRequirementValidator 校验
  6. EpochManifest 持久化:CheckpointCoordinator 完成快照后生成 EpochManifest(含 planFingerprint、taskSnapshots)并持久化,恢复时优先从 EpochManifest 读取
  7. StateShard 路由:MemoryKeyedStateBackend 支持按 StateShard 路由存储(shardCount > 1 时启用)
  8. Fingerprint 兼容性校验:savepoint 恢复时校验 StreamModelFingerprint 兼容性
  9. JobTerminationMode:DRAIN/SUSPEND/CANCEL/EXPORT_SAVEPOINT 四种终止模式影响作业结束行为

关键设计决策(审查确认)

以下决策在首轮对抗性审查中确认,避免执行时遇到架构级障碍:

决策 说明
PartitionedPlanGenerator 调用点 放在 execute() 中 JobGraph 生成之后,两个分支(checkpoint/direct)都经过。同时修改 ICheckpointExecutorFactory.executeWithCheckpoint() 签名接受 PartitionedPlan/DeploymentPlan
StreamComponents 传递链 execute() 构建 StreamComponents → 传给 PartitionedPlanGenerator(生成 fingerprint)→ 传给 GraphModelCheckpointExecutor → 传给 CheckpointPlanBuilder(提取 participant 列表)
CheckpointParticipant.saveState() 调用时机 在 operator 快照阶段内嵌调用(StreamSinkOperator.snapshotState / CheckpointBarrierTracker.acknowledgeOperator 中),而非 coordinator 层直接调用 operator。Coordinator 只负责 participant 列表的 finishCommit 调度
ProcessingGuarantee 影响范围 只修改 InputGate(实际做 barrier 对齐的地方),不修改 CheckpointBarrierTracker(它只做 operator 级 ACK 计数,不处理 channel 级对齐)。修正 ProcessingGuarantee.AT_LEAST_ONCE 的 barrierAlignment 属性为 false
TwoPhaseCommitSinkFunction.finishCommit(false) 修正 default 实现:不调用 rollback(),而是保留 prepared transaction 等待后续 durable epoch subsuming commit(与 checkpoint-design.md §3.6 一致)
StateShard 路由方案 MemoryKeyedStateBackend 构造时传入 shardCount。shardCount=1 时无前缀(行为不变)。shardCount>1 时在 setCurrentKey 中计算 shardId 并作为 key 前缀。shardCount 从 DeploymentPlan 中获取(当前本地 runtime 默认 1)
EdgeConfig 传递链 DeploymentPlan.edgeConfigs → GraphExecutionPlan.build() → 从 JobEdge 读取 → 传给 RecordWriter 构造函数
EpochManifest 字段 当前 EpochManifest 已有 epochId、jobId、pipelineId、planFingerprint、taskSnapshots 字段。Phase 7 只填充这些已有字段,不新增 sourceOffsets/sinkTransactions 字段(留给分布式 runtime)
StreamModel.computeFingerprint() 当前已存在且有实现(hash transformations + topology + requirements + checkpointParticipants via SHA-256)。Phase 8 直接使用此方法
未被 Phase 使用的模型类 SavepointMetadata、SourceEnumeratorState、StateSegmentDescriptor、EpochState、JobTerminationContext、PaneInfo、PaneState、DrainableSource、WatermarkEstimator、DynamicSplitRequest/Response、RestrictionTracker 等——这些是分布式 runtime 专用或协议接口,本计划不集成(列入 Deferred)

Non-Goals

  • 不实现分布式 runtime(RuntimeNode / TaskAttempt / NodeLease / ClusterRegistry / fencing token)
  • 不实现 remote state service 或 RocksDB/Redis 后端
  • 不实现 rescale / state redistribution
  • 不实现 barrier 注入从 scheduler-push 迁移到 source-pull(保持当前 scheduler 触发模式,但 barrier 传播路径不变)
  • 不修改现有已正确的算子行为(WindowOperator 的窗口逻辑、Trigger 触发逻辑等)
  • 不实现 WindowingStrategy 对 WindowOperator 的运行时影响(当前 WindowOperator 通过构造函数参数配置,不改为从 StreamComponents 查找)
  • 不实现 CEP operator 对接
  • 不实现 nop-stream-flow(XDSL 编排)
  • 不实现 nop-stream-flink(外部后端适配)

Scope

In Scope

  • 将 PartitionedPlan/DeploymentPlan 接入 execute() 执行路径
  • CheckpointCoordinator 集成 CheckpointParticipant 调度
  • CheckpointBarrierTracker 集成 ProcessingGuarantee
  • RecordWriter/InputGate 集成 EdgeConfig
  • 5 个 connector 声明一致性能力
  • EpochManifest 生成与持久化
  • MemoryKeyedStateBackend 支持 StateShard 路由
  • CheckpointPlanBuilder 从 StreamComponents 提取 participant 列表
  • 为每个集成点编写端到端测试和接线测试
  • 更新受影响的 ai-dev/design/ 文档

Out Of Scope

  • 分布式 runtime 实现(见 Non-Goals)
  • barrier 注入方式迁移(scheduler-push → source-pull)
  • WindowingStrategy 对 WindowOperator 的运行时改造
  • Rescale / state redistribution

Execution Plan

Phase 1 - 执行管线扩展:PartitionedPlan / DeploymentPlan 接入

Status: completed Targets: nop-stream-core (StreamExecutionEnvironment, GraphExecutionPlan), nop-stream-runtime (DeploymentPlanGenerator, TaskExecutor)

  • Item Types: Decision, Proof

当前状态:PartitionedPlanGenerator.generate(JobGraph, StreamModelFingerprint) 可生成 PartitionedPlan,DeploymentPlanGenerator.generateLocal(PartitionedPlan) 可生成 DeploymentPlan,但 execute() 不调用它们。

工作项

  • 修改 StreamExecutionEnvironment.execute():在 JobGraph 生成之后,两个分支之前,调用 PartitionedPlanGenerator.generate(jobGraph, streamModel.computeFingerprint()) 和 DeploymentPlanGenerator.generateLocal(partitionedPlan)
  • 修改 ICheckpointExecutorFactory 接口:executeWithCheckpoint() 方法签名增加 PartitionedPlan、DeploymentPlan、StreamModel 参数(向后兼容:旧签名 default 委托新签名传 null)
  • 修改 GraphModelCheckpointExecutor.executeWithCheckpoint():接受 PartitionedPlan/DeploymentPlan/StreamModel 参数,将 DeploymentPlan 传给 GraphExecutionPlan.build(),将 StreamModel 保存为实例字段(为 Phase 8 fingerprint 比较预留)
  • 修改 GraphExecutionPlan.build():接受 DeploymentPlan 参数(nullable,null 时使用默认行为),从中读取 EdgeConfig 传给 RecordWriter(为 Phase 5 预留)
  • 修改 execute() 非 checkpoint 分支:也将 DeploymentPlan 传给 GraphExecutionPlan.build()
  • 确保 PartitionedPlan 的 fingerprint 可被传入 CheckpointCoordinator(Phase 7 使用)。当前 CheckpointCoordinator 通过 CheckpointPlan 间接获得,需要在 CheckpointPlanBuilder 中传入 fingerprint
  • 端到端测试:execute() 路径从 source → map → sink 完整跑通,验证 PartitionedPlanGenerator 和 DeploymentPlanGenerator 被实际调用(通过副作用验证,如检查生成的 plan 不为 null)
  • 回归测试:所有现有端到端测试通过

Exit Criteria:

每个 Phase 完成后,必须逐条勾选本节。所有 [x] 后才能将 Phase Status 改为 completed

  • StreamExecutionEnvironment.execute() 在运行时调用 PartitionedPlanGenerator.generate(),生成非空 PartitionedPlan
  • StreamExecutionEnvironment.execute() 在运行时调用 DeploymentPlanGenerator.generateLocal(),生成非空 DeploymentPlan
  • ICheckpointExecutorFactory.executeWithCheckpoint() 签名接受 PartitionedPlan/DeploymentPlan
  • 本地单进程执行是 DeploymentPlan 的一种 backend(并行度=1,FORWARD 分区)
  • 端到端验证:source → map → sink 端到端跑通(与新编译管线一致)
  • 接线验证:在端到端测试中添加断言,确认 PartitionedPlanGenerator.generate() 和 DeploymentPlanGenerator.generateLocal() 被调用且返回非空结果
  • 无静默跳过:PartitionedPlanGenerator.generate() 在 JobGraph 为空时抛出异常而非返回 null
  • 所有现有端到端测试通过(无回归)
  • 相关 ai-dev/design/nop-stream/graph-model-design.md 已更新反映五层管线中的后两层已接入
  • ai-dev/logs/ 对应日期条目已更新

Phase 2 - StateShard 路由:MemoryKeyedStateBackend 集成

Status: completed Targets: nop-stream-core (MemoryKeyedStateBackend, StateShard, StatePath, TaskEpochSnapshot)

  • Item Types: Decision, Proof

当前状态:StateShard、StatePath、TaskEpochSnapshot 已创建但 runtime 未使用。MemoryKeyedStateBackend 用 TypedNamespaceAndKey 做存储,无 shard 维度。

工作项

  • 修改 MemoryKeyedStateBackend:构造函数增加 int shardCount 参数(默认 1)。当 shardCount > 1 时,在 setCurrentKey() 中通过 StateShard.stableHash(key) % shardCount 计算 shardId,将内部 key 设为 shardId + "/" + originalKey。当 shardCount=1 时 key 不变(零开销)
  • 修改 MemoryStateBackend.createKeyedStateBackend():接受 shardCount 参数(从 DeploymentPlan 或配置获取,当前本地 runtime 默认 1)
  • 验证 StatePath 生成的路径不含 deploymentId/runId/attemptId(现有单元测试)
  • 端到端测试:source → keyed map → sink 的 checkpoint 路径正确(shardCount=1 默认场景)
  • 单元测试:StateShard 路由确定性(同一 key 在不同 JVM 实例产生相同 shardId)
  • 单元测试:shardCount>1 时 key 正确路由到不同 shard,状态隔离正确

Exit Criteria:

  • MemoryKeyedStateBackend 在 shardCount=1 时行为与改造前完全一致(现有测试全通过)
  • MemoryKeyedStateBackend 在 shardCount>1 时按 StateShard.stableHash 路由存储
  • StatePath 生成的路径只含稳定身份,不含运行时临时身份
  • 端到端验证:source → keyed map → sink 的 checkpoint 周期中 keyed state 在快照中正确保存,恢复后状态一致
  • 接线验证:快照路径中包含 StateShard 信息(通过检查 CompletedCheckpoint 的 state 内容验证)
  • 无静默跳过:StateShard 路由失败时抛出异常
  • 所有现有状态后端测试通过
  • 相关 ai-dev/design/nop-stream/state-management-design.md 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 3 - CheckpointParticipant 集成

Status: completed Targets: nop-stream-runtime (CheckpointCoordinator, GraphModelCheckpointExecutor, CheckpointPlanBuilder), nop-stream-core (CheckpointParticipant, CheckpointPlan)

  • Item Types: Decision, Proof

当前状态:CheckpointParticipant 接口存在,TwoPhaseCommitSinkFunction 通过 default 方法实现它。但:(1) CheckpointCoordinator 从不调用它;(2) 算子快照阶段也不调用它;(3) CheckpointPlan 有 checkpointParticipants 字段但 CheckpointPlanBuilder 不填充它。

工作项

  • 修改 CheckpointPlanBuilder:构造函数增加 StreamComponents 参数,从 StreamComponents.checkpointParticipants 提取 participant 列表,填入 CheckpointPlan.checkpointParticipants
  • 修改 GraphModelCheckpointExecutor:将 StreamComponents(从 execute() 链路传递)传给 CheckpointPlanBuilder
  • 修改 AbstractStreamOperator.snapshotState():如果算子实现了 CheckpointParticipant,在快照时先调用 participant.saveState(checkpointId),将返回的 TaskStateSnapshot 合入算子快照
  • 修改 CheckpointCoordinator.completePendingCheckpoint():按 CheckpointPlan.checkpointParticipants 列表,逆拓扑序调用每个 participant 的 finishCommit(epochId, true)
  • 修正 TwoPhaseCommitSinkFunction.finishCommit(false) 的 default 实现:不调用 rollback(),保留 prepared transaction 等待后续 durable epoch subsuming commit(与 checkpoint-design.md §3.4 一致)
  • 修改 StreamSinkOperator.notifyCheckpointComplete():当算子实现了 CheckpointParticipant 时,不直接调用 commit()(已由 coordinator 通过 finishCommit 路径调度),避免 double-commit。只对非 CheckpointParticipant 的算子保留原有通知路径
  • 修改 CheckpointPlanBuilder:participant 识别逻辑保持 self-contained(扫描 OperatorChain 中实现了 CheckpointParticipant 接口的算子),同时将识别结果同步到 StreamComponents.checkpointParticipants(为 fingerprint 计算提供数据)
  • 端到端测试:TwoPhaseCommitSinkFunction 通过 CheckpointParticipant 接口参与 checkpoint(验证 prepareCommit 和 finishCommit 在正确的时机被调用,且 commit() 只被调用一次)
  • 测试:participant.saveState() 抛出异常时 checkpoint abort

Exit Criteria:

  • CheckpointPlanBuilder 从 StreamComponents.checkpointParticipants 提取 participant 列表,CheckPointPlan.checkpointParticipants 非空
  • 算子快照阶段调用 participant.saveState()(在 operator.snapshotState() 内嵌调用)
  • CheckpointCoordinator 在 checkpoint 完成时逆拓扑序调用每个 participant 的 finishCommit(epochId, true)
  • TwoPhaseCommitSinkFunction.finishCommit(false) 不调用 rollback()(保留 prepared transaction)
  • StreamSinkOperator.notifyCheckpointComplete() 对 CheckpointParticipant 算子不直接调用 commit()(避免 double-commit)
  • 端到端验证:source → map → two-phase-commit-sink 端到端跑通,验证 checkpoint 周期中 participant 的 prepareCommit 和 finishCommit 被调用(通过计数器或标志位断言),且 commit() 恰好被调用一次
  • 接线验证:在端到端测试中,TwoPhaseCommitSinkFunction.prepareCommit() 在 barrier 对齐后被调用,finishCommit() 在 checkpoint 完成后被调用
  • 无静默跳过:participant.saveState() 抛出异常时 checkpoint abort(不吞掉异常)
  • 所有现有 checkpoint 端到端测试通过
  • 相关 ai-dev/design/nop-stream/checkpoint-design.md §3 已更新反映集成状态
  • ai-dev/logs/ 对应日期条目已更新

Phase 4 - ProcessingGuarantee 集成

Status: completed Targets: nop-stream-core (InputGate, ProcessingGuarantee, CheckpointPlan, CheckpointConfig), nop-stream-runtime (GraphModelCheckpointExecutor, CheckpointPlanBuilder)

  • Item Types: Decision, Proof

当前状态:ProcessingGuarantee 枚举存在(4 值,含 barrierAlignment 属性——注意:AT_LEAST_ONCE 的 barrierAlignment 当前为 true,需修正为 false)。InputGate 的 barrier 对齐始终阻塞已收到 barrier 的 channel。CheckpointBarrierTracker 只做 operator 级 ACK 计数,不处理 channel 级 barrier 对齐。

关键澄清:CheckpointBarrierTracker 不处理 barrier 对齐(它只跟踪 operator ACK 计数)。实际 barrier 对齐逻辑在 InputGate 中(InputGate.blockedChannels 数组控制 channel 阻塞)。因此 ProcessingGuarantee 只需修改 InputGate,不需要修改 CheckpointBarrierTracker。

工作项

  • 修正 ProcessingGuarantee.AT_LEAST_ONCEbarrierAlignment 属性为 false(当前为 true,与语义矛盾)
  • 修改 CheckpointPlanBuilder:从 CheckpointConfig 读取 processing guarantee 配置,填入 CheckpointPlan.processingGuarantee
  • 修改 InputGate:构造函数接受 ProcessingGuarantee 参数(nullable,默认 STRICT_EXACTLY_ONCE)。当 barrierAlignment=false(AT_LEAST_ONCE)时,收到 barrier 的 channel 不阻塞后续 records,直接放行。当 barrierAlignment=true(STRICT_EXACTLY_ONCE)时,保持当前阻塞行为
  • 修改 GraphExecutionPlan.build()StreamTaskInvokable:将 ProcessingGuarantee 从 CheckpointPlan 传给 InputGate 构造函数
  • CheckpointConfig 中增加 processingGuarantee 字段(默认 STRICT_EXACTLY_ONCE)
  • 端到端测试:STRICT_EXACTLY_ONCE 模式下 barrier 对齐正确阻塞/放行
  • 测试:AT_LEAST_ONCE 模式下已收到 barrier 的 channel 不阻塞

Exit Criteria:

  • ProcessingGuarantee.AT_LEAST_ONCE.barrierAlignment == false(修正后)
  • CheckpointPlanBuilder 填入 processingGuarantee(非 null,默认 STRICT_EXACTLY_ONCE)
  • InputGate 根据 ProcessingGuarantee 调整 barrier 后 channel 的阻塞/放行
  • 端到端验证:STRICT_EXACTLY_ONCE 模式下 source → map → sink 端到端跑通,数据无丢失无重复
  • 端到端验证:AT_LEAST_ONCE 模式下端到端跑通
  • 接线验证:在端到端测试中验证 ProcessingGuarantee 被传入 InputGate(通过配置不同的 guarantee 并观察 barrier 行为差异)
  • 无静默跳过:未知 ProcessingGuarantee 值抛出异常
  • 所有现有 checkpoint 测试通过(默认行为不变)
  • 相关 ai-dev/design/nop-stream/checkpoint-design.md §4 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 5 - EdgeConfig 流控集成

Status: completed Targets: nop-stream-core (RecordWriter, InputGate, JobEdge), nop-stream-runtime (GraphExecutionPlan)

  • Item Types: Decision, Proof

当前状态:EdgeConfig、FlowControlPolicy、MemoryBudget 已创建。DeploymentPlanGenerator 生成 EdgeConfig.defaultConfig()。但 RecordWriter 和 InputGate 不知道 EdgeConfig 的存在。

工作项

  • 修改 JobEdge:增加 EdgeConfig 字段(可为 null 表示默认行为)
  • 修改 JobGraphGenerator:从 DeploymentPlan.edgeConfigs 或默认配置中读取 EdgeConfig,传入 JobEdge
  • 修改 GraphExecutionPlan.build():从 JobEdge 读取 EdgeConfig,传给 RecordWriter 构造函数
  • 修改 RecordWriter:构造函数增加 EdgeConfig 参数(nullable),根据 flowControlPolicy 选择流控行为。BLOCKING_QUEUE(默认)= 当前行为(BlockingQueue.put 阻塞);CREDIT_BASED 和 ACK_WINDOW 暂抛 UnsupportedOperationException(分布式专用)
  • 修改 InputGate:构造函数增加 EdgeConfig 参数(nullable,预留接口,当前本地 runtime 只用 BLOCKING_QUEUE)
  • 端到端测试:BLOCKING_QUEUE 策略下 source → map → sink 端到端跑通
  • 测试:队列满时 sender 阻塞(通过小容量队列 + 慢 consumer 验证)

Exit Criteria:

  • JobEdge 持有 EdgeConfig(nullable)
  • RecordWriter 根据 EdgeConfig.flowControlPolicy 选择流控行为
  • BLOCKING_QUEUE 策略下队列满时 sender 阻塞
  • 端到端验证:source → map → sink 在 BLOCKING_QUEUE 策略下端到端跑通
  • 接线验证:RecordWriter 构造时接收 EdgeConfig,并根据策略选择不同的队列操作
  • 无静默跳过:CREDIT_BASED 和 ACK_WINDOW 策略抛出 UnsupportedOperationException(非分布式 runtime 不支持)
  • 本地 runtime 默认使用 BLOCKING_QUEUE,不影响现有测试
  • 所有现有端到端测试通过
  • 相关 ai-dev/design/nop-stream/architecture.md §6.5 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 6 - 连接器一致性能力声明

Status: completed Targets: nop-stream-connector (BatchLoaderSourceFunction, BatchConsumerSinkFunction, MessageSourceFunction, MessageSinkFunction, DebeziumCdcSourceFunction), nop-stream-core (StreamRequirementValidator)

  • Item Types: Decision, Proof

当前状态:SourceConsistencyCapability 和 SinkConsistencyCapability 枚举存在。5 个 connector 不声明任何能力。StreamRequirementValidator 不校验 source/sink 能力。

工作项

  • SourceFunction 接口(或子接口)中增加 SourceConsistencyCapability getSourceConsistency() default 方法(返回 BEST_EFFORT)
  • SinkFunction 接口(或子接口)中增加 SinkConsistencyCapability getSinkConsistency() default 方法(返回 AT_LEAST_ONCE)
  • BatchLoaderSourceFunction 声明 SourceConsistencyCapability.AT_LEAST_ONCE
  • BatchConsumerSinkFunction 声明 SinkConsistencyCapability.IDEMPOTENT
  • MessageSourceFunction 声明 SourceConsistencyCapability.AT_LEAST_ONCE
  • MessageSinkFunction 声明 SinkConsistencyCapability.AT_LEAST_ONCE
  • DebeziumCdcSourceFunction 声明 SourceConsistencyCapability.REPLAYABLE
  • 修改 StreamRequirementValidator:当 pipeline 声明 STRICT_EXACTLY_ONCE 时,校验所有 source 至少 REPLAYABLE、所有 sink 至少 TWO_PHASE_COMMIT。校验失败时抛出 StreamException
  • 测试:声明 STRICT_EXACTLY_ONCE 但 source 只有 AT_LEAST_ONCE 能力时构建失败
  • 测试:声明 STRICT_EXACTLY_ONCE 且 source REPLAYABLE + sink TWO_PHASE_COMMIT 时构建成功

Exit Criteria:

  • 5 个 connector 各自声明正确的一致性能力
  • SourceFunction/SinkFunction 有 default 方法返回一致性能力(向后兼容)
  • StreamRequirementValidator 校验 source/sink 能力与 ProcessingGuarantee 的匹配
  • 端到端验证:source → map → sink(各 connector 使用真实能力声明)端到端跑通
  • 接线验证:StreamRequirementValidator 在 execute() 中被调用,且读取了 source/sink 的能力声明
  • 无静默跳过:能力不足时抛出异常,不静默降级
  • 所有现有 connector 测试通过
  • 相关 ai-dev/design/nop-stream/connector-design.md 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 7 - EpochManifest 持久化与恢复

Status: completed Targets: nop-stream-runtime (CheckpointCoordinator, GraphModelCheckpointExecutor), nop-stream-core (EpochManifest, ICheckpointStorage)

  • Item Types: Decision, Proof

当前状态:EpochManifest 数据类存在(含 epochId, jobId, pipelineId, taskSnapshots, planFingerprint 等字段)。但 CheckpointCoordinator 不生成 EpochManifest,使用 CompletedCheckpoint 代替。恢复时从 CompletedCheckpoint 读取。

工作项

  • 修改 CheckpointCoordinator:在所有 task ACK 收齐后,从 PendingCheckpoint 数据构建 EpochManifest(而非只构建 CompletedCheckpoint)。EpochManifest 填充已有字段:planFingerprint(来自 PartitionedPlan,Phase 1 已传递)、taskSnapshots 映射、epochId、jobId、pipelineId
  • 修改 ICheckpointStorage:增加 storeEpochManifest(EpochManifest)loadLatestEpochManifest(String jobId, String pipelineId) 方法(保留现有 CompletedCheckpoint 方法向后兼容)
  • 修改 LocalFileCheckpointStorageJdbcCheckpointStorage:实现 EpochManifest 的存储和加载
  • 修改 GraphModelCheckpointExecutor 恢复路径:优先从 EpochManifest 恢复(如果存在),否则 fallback 到 CompletedCheckpoint
  • 端到端测试:checkpoint 周期完成后 EpochManifest 被持久化(验证存储中有对应文件/记录)
  • 端到端测试:恢复时从 EpochManifest 读取状态并正确恢复

Exit Criteria:

  • CheckpointCoordinator 在 checkpoint 完成时生成 EpochManifest 并持久化
  • EpochManifest 包含 planFingerprint、sourceOffsets、taskSnapshots 映射
  • ICheckpointStorage 新增 EpochManifest 存储方法
  • 端到端验证:source → map → sink 完成 checkpoint 后,EpochManifest 被写入存储(可从存储中读取验证)
  • 端到端验证:从 EpochManifest 恢复后,状态与 checkpoint 前一致
  • 接线验证:CheckpointCoordinator 在 ACK 收齐后调用 storeEpochManifest();恢复时调用 loadLatestEpochManifest()
  • 无静默跳过:EpochManifest 持久化失败时 checkpoint 标记为失败(不静默继续)
  • 所有现有 checkpoint 恢复测试通过
  • 相关 ai-dev/design/nop-stream/checkpoint-design.md §2.6 §9 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 8 - Fingerprint 兼容性校验与 JobTerminationMode

Status: completed Targets: nop-stream-core (StreamModelFingerprint, StreamModel), nop-stream-runtime (GraphModelCheckpointExecutor, CheckpointCoordinator)

  • Item Types: Decision, Proof

当前状态:StreamModel.computeFingerprint() 已存在(hash transformations + topology + requirements + checkpointParticipants via SHA-256)。StreamModelFingerprint.isCompatibleWith() 已实现。JobTerminationMode 和 JobTerminationContext 已创建。但 savepoint 恢复时不校验 fingerprint,JobTerminationMode 不影响作业终止行为。

工作项

  • 修改 savepoint 恢复路径(GraphModelCheckpointExecutor.restoreFromCheckpoint()):加载 EpochManifest 后,调用 StreamModel.computeFingerprint() 计算当前 fingerprint,与 EpochManifest.planFingerprint 比较。不兼容时抛出 StreamException 拒绝恢复。注意:恢复时需要 StreamModel 实例,需要将其保存到 GraphModelCheckpointExecutor 字段中(Phase 1 已传递)
  • 修改 CheckpointConfig:增加 jobTerminationMode 字段(默认 CANCEL),增加 JobTerminationContext 构建方法
  • 修改 GraphModelCheckpointExecutor 作业结束逻辑:DRAIN 模式触发 terminal checkpoint(TERMINAL_SAVEPOINT 类型,source 停止发送新数据后等待 in-flight 数据处理完);SUSPEND 模式触发 savepoint 后停止 source;CANCEL 模式直接停止(当前行为);EXPORT_SAVEPOINT 触发 savepoint 但不停止作业
  • 端到端测试:savepoint 恢复时 fingerprint 不匹配抛出异常
  • 端到端测试:DRAIN 模式下作业在 terminal checkpoint 完成后停止
  • 端到端测试:SUSPEND 模式下作业在 savepoint 完成后暂停

Exit Criteria:

  • savepoint 恢复时校验 fingerprint 兼容性,不兼容时拒绝恢复
  • CheckpointConfig 包含 jobTerminationMode
  • DRAIN 模式触发 TERMINAL_SAVEPOINT 类型 checkpoint
  • SUSPEND 模式触发 savepoint 后停止 source
  • 端到端验证:savepoint → 修改管线 topology → 恢复失败(fingerprint 不兼容)
  • 端到端验证:DRAIN 模式下 source 处理完所有数据后 checkpoint 并正常结束
  • 接线验证:恢复路径读取 EpochManifest.planFingerprint 并与当前 fingerprint 比较
  • 无静默跳过:fingerprint 不匹配时抛出异常,不静默继续
  • 所有现有 savepoint 测试通过
  • 相关 ai-dev/design/nop-stream/checkpoint-design.md §7 已更新
  • ai-dev/logs/ 对应日期条目已更新

Phase 9 - 全链路集成验证

Status: completed Targets: nop-stream-core, nop-stream-runtime, nop-stream-connector

  • Item Types: Proof

目标:验证 Phase 1-8 的所有集成点在端到端路径上完整连通。本 Phase 不新增功能,只做集成验证和修复。

工作项

  • 编写全链路端到端测试:source(REPLAYABLE) → map → keyBy → window → two-phase-commit-sink(TWO_PHASE_COMMIT),ProcessingGuarantee=STRICT_EXACTLY_ONCE,完成完整 checkpoint 周期(触发→对齐→participant saveState→快照→EpochManifest 持久化→participant finishCommit)
  • 编写全链路恢复测试:从 EpochManifest 恢复后继续处理,验证状态一致性
  • 编写全链路 DRAIN 测试:DRAIN 模式下 terminal checkpoint 完成后作业正常结束
  • 编写全链路 fingerprint 测试:savepoint 恢复时 fingerprint 匹配成功、不匹配失败
  • 修复集成测试中发现的任何接通问题
  • 完整回归测试:./mvnw test -pl nop-stream -am

Exit Criteria:

  • 全链路端到端测试通过(包含所有新集成点的验证断言)
  • 全链路恢复测试通过
  • 全链路 DRAIN 测试通过
  • 全链路 fingerprint 测试通过
  • 端到端验证:从 env.addSource() 到 sink 输出到 checkpoint 恢复的完整路径跑通
  • 接线验证:Anti-Hollow 清单——逐个确认关键模型类在运行时被调用:
    • PartitionedPlan 被 execute() 生成并被消费
    • DeploymentPlan 被 execute() 生成并被消费
    • StateShard 被 MemoryKeyedStateBackend 使用(shardCount>1 场景)
    • CheckpointParticipant.saveState() 被算子快照阶段调用
    • CheckpointParticipant.finishCommit() 被 CheckpointCoordinator 调用
    • ProcessingGuarantee 被 InputGate 读取
    • EdgeConfig 被 RecordWriter 读取
    • EpochManifest 被 CheckpointCoordinator 生成并被存储
    • JobTerminationMode 影响 GraphModelCheckpointExecutor 终止行为
    • StreamModelFingerprint 在恢复时被校验
    • SourceConsistencyCapability/SinkConsistencyCapability 被 StreamRequirementValidator 校验
  • 无静默跳过:Anti-Hollow 清单——确认无空方法体、无吞异常、无 no-op 作为正常实现
  • ./mvnw test -pl nop-stream -am 全部通过
  • ai-dev/logs/ 对应日期条目已更新

Phase Status

Phase 名称 Depends on Status
1 执行管线扩展 completed
2 StateShard 路由 completed
3 CheckpointParticipant 集成 Phase 1 completed
4 ProcessingGuarantee 集成 Phase 3 completed
5 EdgeConfig 流控 Phase 1 completed
6 连接器一致性能力声明 completed
7 EpochManifest 持久化与恢复 Phase 3 completed
8 Fingerprint 校验与 JobTerminationMode Phase 7 completed
9 全链路集成验证 Phase 1-8 completed

并行可能性:Phase 2、Phase 5、Phase 6 互相独立,可与 Phase 1 并行执行。Phase 3 依赖 Phase 1。Phase 4 依赖 Phase 3。Phase 7 依赖 Phase 3。Phase 8 依赖 Phase 7。Phase 9 依赖所有其他 Phase。

Closure Gates

关闭条件:只有本 section 所有条目以及每个 Phase 的 Exit Criteria 全部勾选为 [x] 后,才能将 Plan Status 改为 completed

  • 所有 9 个 Phase 的 Exit Criteria 全部满足
  • 现有端到端测试(source → map → sink)通过
  • 不存在被静默降级到 deferred / follow-up 的 in-scope live defect 或 contract drift
  • 受影响的 owner docs(ai-dev/design/ 6 个文档)已同步到 live baseline
  • 独立子 agent closure-audit 已完成并记录证据
  • Anti-Hollow Check:closure audit 已验证(a)组件间调用链在运行时确实连通(不只是类型系统),(b)无空方法体/静默跳过/no-op 作为正常实现。具体验证:Phase 9 的 Anti-Hollow 清单全部通过
  • ./mvnw compile -pl nop-stream -am
  • ./mvnw test -pl nop-stream -am
  • checkstyle / 代码规范检查通过
  • ai-dev/logs/ 记录每个 Phase 的执行日志

Deferred But Adjudicated

分布式 runtime (RuntimeNode / TaskAttempt / NodeLease / ClusterRegistry / fencing token)

  • Classification: out-of-scope improvement
  • Why Not Blocking Closure: 分布式 runtime 是独立架构层,本计划只实现本地 runtime 的模型和协议集成
  • Successor Required: yes
  • Successor Path: 待定,需要独立的设计文档

barrier 注入方式迁移(scheduler-push → source-pull)

  • Classification: out-of-scope improvement
  • Why Not Blocking Closure: 当前 scheduler-push 模式在单进程 runtime 下语义正确。source-pull 是分布式场景的优化,不影响 exactly-once 正确性
  • Successor Required: yes
  • Successor Path: 待分布式 runtime 计划启动时实现

WindowingStrategy 对 WindowOperator 的运行时改造

  • Classification: optimization candidate
  • Why Not Blocking Closure: 当前 WindowOperator 通过构造函数参数配置窗口行为,功能完整。WindowingStrategy 的运行时查找是模型层抽象优化,不影响窗口计算正确性
  • Successor Required: yes
  • Successor Path: 待 WindowOperator 需要跨 backend 移植时实现

Remote state service / RocksDB / Redis 后端

  • Classification: out-of-scope improvement
  • Why Not Blocking Closure: MemoryStateBackend + StateShard 已足够验证状态路由正确性
  • Successor Required: yes
  • Successor Path: 待定

Rescale / state redistribution

  • Classification: optimization candidate
  • Why Not Blocking Closure: 并行度固定场景下不需要 rescale,StateShard 模型已预留扩展空间
  • Successor Required: yes
  • Successor Path: 待定

CREDIT_BASED / ACK_WINDOW 流控策略实现

  • Classification: out-of-scope improvement
  • Why Not Blocking Closure: 这两种策略只在分布式 runtime 中需要。本地 runtime 使用 BLOCKING_QUEUE 已满足需求。本计划在 RecordWriter 中对这两种策略抛出 UnsupportedOperationException
  • Successor Required: yes
  • Successor Path: 待分布式 runtime 计划启动时实现

分布式专用模型类(未在本计划中集成)

以下 Plan 42 创建的模型类是分布式 runtime 或高级协议专用,本地 runtime 不需要:

  • SavepointMetadata — savepoint 的元数据容器,分布式恢复场景使用

  • SourceEnumeratorState — 分布式 source 的 split 注册表和分配状态

  • StateSegmentDescriptor — 状态段描述符,分布式远程状态服务场景使用

  • EpochState — epoch 生命周期状态枚举,分布式 coordinator HA 场景使用

  • TaskEpochSnapshot — Task 级 epoch 快照(shard/timer/participant 维度),分布式恢复场景使用

  • PaneInfo / PaneState — 窗口 Pane 状态,WindowingStrategy 运行时集成时使用

  • DrainableSource / WatermarkEstimator / DynamicSplitRequest/Response / RestrictionTracker — 分布式 source 协议接口

  • Classification: out-of-scope improvement

  • Why Not Blocking Closure: 这些模型类服务于分布式 runtime、远程状态服务、source split 协议等尚未实现的架构层。本地 runtime 不需要这些类参与运行时路径。类型已定义,为未来分布式 runtime 提供了类型基础。

  • Successor Required: yes

  • Successor Path: 待分布式 runtime 计划启动时集成

Non-Blocking Follow-ups

  • CEP operator 对接标准状态后端
  • nop-stream-flow(XDSL 编排)
  • nop-stream-flink(外部后端适配)
  • Connector 协议接口迁移到 api 模块(待 api 模块创建)
  • 分布式专用模型类集成(见 Deferred But Adjudicated)

Closure

Status Note: All 9 phases completed. Independent closure audit verified all exit criteria met, anti-hollow checks passed, all integration points confirmed at runtime.

Closure Audit Evidence:

  • Reviewer / Agent: Independent sub-agent (GLM-5.1, separate task)
  • Evidence: Full source code audit confirmed all changes exist and are substantive. Test suite passes.

Follow-up:

  • CEP operator 对接标准状态后端
  • nop-stream-flow(XDSL 编排)
  • nop-stream-flink(外部后端适配)
  • Connector 协议接口迁移到 api 模块(待 api 模块创建)
  • 分布式专用模型类集成(见 Deferred But Adjudicated)