44 nop-stream 分布式设计模型运行时集成
Plan Status: completed Last Reviewed: 2026-05-24 Source: Plan 42 模型层实现审计发现——35 个模型类已创建但运行时零集成;结合
ai-dev/design/nop-stream/全量设计文档分析 Related: Plan 42 (design-implementation, superseded by this plan)
Purpose
将 Plan 42 已创建但未接入运行时的 35 个模型类完整集成到 nop-stream 执行管线和 checkpoint 子系统中。当前状态:类型已存在,运行时行为为零。本计划完成后,从 execute() 到 checkpoint 恢复的完整路径上,每个新模型类都必须被运行时代码实际调用。
Current Baseline
Plan 42 已创建的模型类(35 个 + 9 枚举):全部存在且编译通过,869 个现有测试通过。
运行时集成审计结果(全部为 NO):
| 审计项 | 状态 |
|---|---|
| CheckpointCoordinator 调用 CheckpointParticipant | NO |
| GraphModelCheckpointExecutor 使用 EpochManifest / ProcessingGuarantee / CheckpointParticipant | NO(全部 NO) |
| RecordWriter 读取 EdgeConfig | NO |
| InputGate 读取 EdgeConfig 或 ProcessingGuarantee | NO |
| StreamExecutionEnvironment.execute() 调用 PartitionedPlanGenerator / DeploymentPlanGenerator | NO |
| CheckpointPlanBuilder 读取 StreamComponents.checkpointParticipants | NO |
| WindowOperator 引用 WindowingStrategy / StreamComponents | NO |
| CheckpointBarrierTracker 根据 ProcessingGuarantee 调整 barrier 行为 | NO |
| 任何 connector 声明 SourceConsistencyCapability / SinkConsistencyCapability | NO |
| MemoryKeyedStateBackend 使用 StateShard 路由 | NO |
| TaskEpochSnapshot 在 runtime 中被使用 | NO(dead code) |
已有且正常工作的运行时路径:
execute()→ StreamGraphGenerator → JobGraphGenerator → GraphExecutionPlan → TaskExecutor- CheckpointCoordinator + CheckpointBarrierTracker + CheckpointPlanBuilder + PendingCheckpoint
- Source → ChainingOutput → Operator → Sink(单链/多链管线)
- Barrier 注入(scheduler 线程触发)→ 传播 → ACK → 持久化 → 恢复
- TwoPhaseCommitSinkFunction 的 preCommit/commit/rollback(不通过 CheckpointParticipant 接口)
- RecordWriter 按 IPartitioner 分区(无 EdgeConfig)
- InputGate barrier 对齐(硬编码"等待所有 channel",无 ProcessingGuarantee 分支)
关键现有类及其位置:
| 类 | 模块 | 包 |
|---|---|---|
| StreamExecutionEnvironment | core | io.nop.stream.core.environment |
| CheckpointCoordinator | runtime | io.nop.stream.runtime.checkpoint |
| GraphModelCheckpointExecutor | runtime | io.nop.stream.runtime.execution |
| CheckpointBarrierTracker | core | io.nop.stream.core.execution |
| CheckpointPlanBuilder | runtime | io.nop.stream.runtime.checkpoint |
| RecordWriter | core | io.nop.stream.core.execution |
| InputGate | core | io.nop.stream.core.execution |
| GraphExecutionPlan | runtime | io.nop.stream.runtime.execution |
| StreamTaskInvokable | runtime | io.nop.stream.runtime.execution |
| MemoryKeyedStateBackend | core | io.nop.stream.core.common.state.backend.memory |
| WindowOperator | runtime | io.nop.stream.runtime.operators.windowing |
| JobGraphGenerator | core | io.nop.stream.core.graph |
| StreamGraphGenerator | core | io.nop.stream.core.graph |
| PartitionedPlanGenerator | core | io.nop.stream.core.graph |
| DeploymentPlanGenerator | runtime | io.nop.stream.runtime.execution |
Goals
- 执行管线扩展:
execute()调用 PartitionedPlanGenerator → DeploymentPlanGenerator,生成的 DeploymentPlan 被 GraphExecutionPlan/TaskExecutor 消费 - CheckpointParticipant 集成:算子快照阶段调用 participant.saveState(),checkpoint 完成时 coordinator 调用 participant.finishCommit(true)。TwoPhaseCommitSinkFunction 通过 CheckpointParticipant 接口参与
- ProcessingGuarantee 生效:InputGate 根据 ProcessingGuarantee 决定 barrier 对齐行为(STRICT 阻塞已收到 barrier 的 channel vs AT_LEAST_ONCE 不阻塞)
- EdgeConfig 流控:RecordWriter 根据 EdgeConfig.flowControlPolicy 选择流控行为
- 连接器能力声明:5 个现有 connector 声明一致性能力,参与 StreamRequirementValidator 校验
- EpochManifest 持久化:CheckpointCoordinator 完成快照后生成 EpochManifest(含 planFingerprint、taskSnapshots)并持久化,恢复时优先从 EpochManifest 读取
- StateShard 路由:MemoryKeyedStateBackend 支持按 StateShard 路由存储(shardCount > 1 时启用)
- Fingerprint 兼容性校验:savepoint 恢复时校验 StreamModelFingerprint 兼容性
- JobTerminationMode:DRAIN/SUSPEND/CANCEL/EXPORT_SAVEPOINT 四种终止模式影响作业结束行为
关键设计决策(审查确认)
以下决策在首轮对抗性审查中确认,避免执行时遇到架构级障碍:
| 决策 | 说明 |
|---|---|
| PartitionedPlanGenerator 调用点 | 放在 execute() 中 JobGraph 生成之后,两个分支(checkpoint/direct)都经过。同时修改 ICheckpointExecutorFactory.executeWithCheckpoint() 签名接受 PartitionedPlan/DeploymentPlan |
| StreamComponents 传递链 | execute() 构建 StreamComponents → 传给 PartitionedPlanGenerator(生成 fingerprint)→ 传给 GraphModelCheckpointExecutor → 传给 CheckpointPlanBuilder(提取 participant 列表) |
| CheckpointParticipant.saveState() 调用时机 | 在 operator 快照阶段内嵌调用(StreamSinkOperator.snapshotState / CheckpointBarrierTracker.acknowledgeOperator 中),而非 coordinator 层直接调用 operator。Coordinator 只负责 participant 列表的 finishCommit 调度 |
| ProcessingGuarantee 影响范围 | 只修改 InputGate(实际做 barrier 对齐的地方),不修改 CheckpointBarrierTracker(它只做 operator 级 ACK 计数,不处理 channel 级对齐)。修正 ProcessingGuarantee.AT_LEAST_ONCE 的 barrierAlignment 属性为 false |
| TwoPhaseCommitSinkFunction.finishCommit(false) | 修正 default 实现:不调用 rollback(),而是保留 prepared transaction 等待后续 durable epoch subsuming commit(与 checkpoint-design.md §3.6 一致) |
| StateShard 路由方案 | MemoryKeyedStateBackend 构造时传入 shardCount。shardCount=1 时无前缀(行为不变)。shardCount>1 时在 setCurrentKey 中计算 shardId 并作为 key 前缀。shardCount 从 DeploymentPlan 中获取(当前本地 runtime 默认 1) |
| EdgeConfig 传递链 | DeploymentPlan.edgeConfigs → GraphExecutionPlan.build() → 从 JobEdge 读取 → 传给 RecordWriter 构造函数 |
| EpochManifest 字段 | 当前 EpochManifest 已有 epochId、jobId、pipelineId、planFingerprint、taskSnapshots 字段。Phase 7 只填充这些已有字段,不新增 sourceOffsets/sinkTransactions 字段(留给分布式 runtime) |
| StreamModel.computeFingerprint() | 当前已存在且有实现(hash transformations + topology + requirements + checkpointParticipants via SHA-256)。Phase 8 直接使用此方法 |
| 未被 Phase 使用的模型类 | SavepointMetadata、SourceEnumeratorState、StateSegmentDescriptor、EpochState、JobTerminationContext、PaneInfo、PaneState、DrainableSource、WatermarkEstimator、DynamicSplitRequest/Response、RestrictionTracker 等——这些是分布式 runtime 专用或协议接口,本计划不集成(列入 Deferred) |
Non-Goals
- 不实现分布式 runtime(RuntimeNode / TaskAttempt / NodeLease / ClusterRegistry / fencing token)
- 不实现 remote state service 或 RocksDB/Redis 后端
- 不实现 rescale / state redistribution
- 不实现 barrier 注入从 scheduler-push 迁移到 source-pull(保持当前 scheduler 触发模式,但 barrier 传播路径不变)
- 不修改现有已正确的算子行为(WindowOperator 的窗口逻辑、Trigger 触发逻辑等)
- 不实现 WindowingStrategy 对 WindowOperator 的运行时影响(当前 WindowOperator 通过构造函数参数配置,不改为从 StreamComponents 查找)
- 不实现 CEP operator 对接
- 不实现 nop-stream-flow(XDSL 编排)
- 不实现 nop-stream-flink(外部后端适配)
Scope
In Scope
- 将 PartitionedPlan/DeploymentPlan 接入 execute() 执行路径
- CheckpointCoordinator 集成 CheckpointParticipant 调度
- CheckpointBarrierTracker 集成 ProcessingGuarantee
- RecordWriter/InputGate 集成 EdgeConfig
- 5 个 connector 声明一致性能力
- EpochManifest 生成与持久化
- MemoryKeyedStateBackend 支持 StateShard 路由
- CheckpointPlanBuilder 从 StreamComponents 提取 participant 列表
- 为每个集成点编写端到端测试和接线测试
- 更新受影响的
ai-dev/design/文档
Out Of Scope
- 分布式 runtime 实现(见 Non-Goals)
- barrier 注入方式迁移(scheduler-push → source-pull)
- WindowingStrategy 对 WindowOperator 的运行时改造
- Rescale / state redistribution
Execution Plan
Phase 1 - 执行管线扩展:PartitionedPlan / DeploymentPlan 接入
Status: completed
Targets: nop-stream-core (StreamExecutionEnvironment, GraphExecutionPlan), nop-stream-runtime (DeploymentPlanGenerator, TaskExecutor)
- Item Types:
Decision,Proof
当前状态:PartitionedPlanGenerator.generate(JobGraph, StreamModelFingerprint) 可生成 PartitionedPlan,DeploymentPlanGenerator.generateLocal(PartitionedPlan) 可生成 DeploymentPlan,但 execute() 不调用它们。
工作项:
- 修改
StreamExecutionEnvironment.execute():在 JobGraph 生成之后,两个分支之前,调用 PartitionedPlanGenerator.generate(jobGraph, streamModel.computeFingerprint()) 和 DeploymentPlanGenerator.generateLocal(partitionedPlan) - 修改
ICheckpointExecutorFactory接口:executeWithCheckpoint()方法签名增加 PartitionedPlan、DeploymentPlan、StreamModel 参数(向后兼容:旧签名 default 委托新签名传 null) - 修改
GraphModelCheckpointExecutor.executeWithCheckpoint():接受 PartitionedPlan/DeploymentPlan/StreamModel 参数,将 DeploymentPlan 传给GraphExecutionPlan.build(),将 StreamModel 保存为实例字段(为 Phase 8 fingerprint 比较预留) - 修改
GraphExecutionPlan.build():接受 DeploymentPlan 参数(nullable,null 时使用默认行为),从中读取 EdgeConfig 传给 RecordWriter(为 Phase 5 预留) - 修改
execute()非 checkpoint 分支:也将 DeploymentPlan 传给GraphExecutionPlan.build() - 确保 PartitionedPlan 的 fingerprint 可被传入 CheckpointCoordinator(Phase 7 使用)。当前 CheckpointCoordinator 通过 CheckpointPlan 间接获得,需要在 CheckpointPlanBuilder 中传入 fingerprint
- 端到端测试:
execute()路径从 source → map → sink 完整跑通,验证 PartitionedPlanGenerator 和 DeploymentPlanGenerator 被实际调用(通过副作用验证,如检查生成的 plan 不为 null) - 回归测试:所有现有端到端测试通过
Exit Criteria:
每个 Phase 完成后,必须逐条勾选本节。所有
[x]后才能将 Phase Status 改为completed。
-
StreamExecutionEnvironment.execute()在运行时调用 PartitionedPlanGenerator.generate(),生成非空 PartitionedPlan -
StreamExecutionEnvironment.execute()在运行时调用 DeploymentPlanGenerator.generateLocal(),生成非空 DeploymentPlan -
ICheckpointExecutorFactory.executeWithCheckpoint()签名接受 PartitionedPlan/DeploymentPlan - 本地单进程执行是 DeploymentPlan 的一种 backend(并行度=1,FORWARD 分区)
- 端到端验证:source → map → sink 端到端跑通(与新编译管线一致)
- 接线验证:在端到端测试中添加断言,确认 PartitionedPlanGenerator.generate() 和 DeploymentPlanGenerator.generateLocal() 被调用且返回非空结果
- 无静默跳过:PartitionedPlanGenerator.generate() 在 JobGraph 为空时抛出异常而非返回 null
- 所有现有端到端测试通过(无回归)
- 相关
ai-dev/design/nop-stream/graph-model-design.md已更新反映五层管线中的后两层已接入 -
ai-dev/logs/对应日期条目已更新
Phase 2 - StateShard 路由:MemoryKeyedStateBackend 集成
Status: completed
Targets: nop-stream-core (MemoryKeyedStateBackend, StateShard, StatePath, TaskEpochSnapshot)
- Item Types:
Decision,Proof
当前状态:StateShard、StatePath、TaskEpochSnapshot 已创建但 runtime 未使用。MemoryKeyedStateBackend 用 TypedNamespaceAndKey 做存储,无 shard 维度。
工作项:
- 修改
MemoryKeyedStateBackend:构造函数增加int shardCount参数(默认 1)。当 shardCount > 1 时,在setCurrentKey()中通过StateShard.stableHash(key) % shardCount计算 shardId,将内部 key 设为shardId + "/" + originalKey。当 shardCount=1 时 key 不变(零开销) - 修改
MemoryStateBackend.createKeyedStateBackend():接受 shardCount 参数(从 DeploymentPlan 或配置获取,当前本地 runtime 默认 1) - 验证 StatePath 生成的路径不含 deploymentId/runId/attemptId(现有单元测试)
- 端到端测试:source → keyed map → sink 的 checkpoint 路径正确(shardCount=1 默认场景)
- 单元测试:StateShard 路由确定性(同一 key 在不同 JVM 实例产生相同 shardId)
- 单元测试:shardCount>1 时 key 正确路由到不同 shard,状态隔离正确
Exit Criteria:
- MemoryKeyedStateBackend 在 shardCount=1 时行为与改造前完全一致(现有测试全通过)
- MemoryKeyedStateBackend 在 shardCount>1 时按 StateShard.stableHash 路由存储
- StatePath 生成的路径只含稳定身份,不含运行时临时身份
- 端到端验证:source → keyed map → sink 的 checkpoint 周期中 keyed state 在快照中正确保存,恢复后状态一致
- 接线验证:快照路径中包含 StateShard 信息(通过检查 CompletedCheckpoint 的 state 内容验证)
- 无静默跳过:StateShard 路由失败时抛出异常
- 所有现有状态后端测试通过
- 相关
ai-dev/design/nop-stream/state-management-design.md已更新 -
ai-dev/logs/对应日期条目已更新
Phase 3 - CheckpointParticipant 集成
Status: completed
Targets: nop-stream-runtime (CheckpointCoordinator, GraphModelCheckpointExecutor, CheckpointPlanBuilder), nop-stream-core (CheckpointParticipant, CheckpointPlan)
- Item Types:
Decision,Proof
当前状态:CheckpointParticipant 接口存在,TwoPhaseCommitSinkFunction 通过 default 方法实现它。但:(1) CheckpointCoordinator 从不调用它;(2) 算子快照阶段也不调用它;(3) CheckpointPlan 有 checkpointParticipants 字段但 CheckpointPlanBuilder 不填充它。
工作项:
- 修改
CheckpointPlanBuilder:构造函数增加StreamComponents参数,从 StreamComponents.checkpointParticipants 提取 participant 列表,填入 CheckpointPlan.checkpointParticipants - 修改
GraphModelCheckpointExecutor:将 StreamComponents(从 execute() 链路传递)传给 CheckpointPlanBuilder - 修改
AbstractStreamOperator.snapshotState():如果算子实现了 CheckpointParticipant,在快照时先调用participant.saveState(checkpointId),将返回的 TaskStateSnapshot 合入算子快照 - 修改
CheckpointCoordinator.completePendingCheckpoint():按 CheckpointPlan.checkpointParticipants 列表,逆拓扑序调用每个 participant 的finishCommit(epochId, true) - 修正
TwoPhaseCommitSinkFunction.finishCommit(false)的 default 实现:不调用rollback(),保留 prepared transaction 等待后续 durable epoch subsuming commit(与 checkpoint-design.md §3.4 一致) - 修改
StreamSinkOperator.notifyCheckpointComplete():当算子实现了 CheckpointParticipant 时,不直接调用commit()(已由 coordinator 通过 finishCommit 路径调度),避免 double-commit。只对非 CheckpointParticipant 的算子保留原有通知路径 - 修改
CheckpointPlanBuilder:participant 识别逻辑保持 self-contained(扫描 OperatorChain 中实现了 CheckpointParticipant 接口的算子),同时将识别结果同步到 StreamComponents.checkpointParticipants(为 fingerprint 计算提供数据) - 端到端测试:TwoPhaseCommitSinkFunction 通过 CheckpointParticipant 接口参与 checkpoint(验证 prepareCommit 和 finishCommit 在正确的时机被调用,且 commit() 只被调用一次)
- 测试:participant.saveState() 抛出异常时 checkpoint abort
Exit Criteria:
- CheckpointPlanBuilder 从 StreamComponents.checkpointParticipants 提取 participant 列表,CheckPointPlan.checkpointParticipants 非空
- 算子快照阶段调用 participant.saveState()(在 operator.snapshotState() 内嵌调用)
- CheckpointCoordinator 在 checkpoint 完成时逆拓扑序调用每个 participant 的 finishCommit(epochId, true)
- TwoPhaseCommitSinkFunction.finishCommit(false) 不调用 rollback()(保留 prepared transaction)
- StreamSinkOperator.notifyCheckpointComplete() 对 CheckpointParticipant 算子不直接调用 commit()(避免 double-commit)
- 端到端验证:source → map → two-phase-commit-sink 端到端跑通,验证 checkpoint 周期中 participant 的 prepareCommit 和 finishCommit 被调用(通过计数器或标志位断言),且 commit() 恰好被调用一次
- 接线验证:在端到端测试中,TwoPhaseCommitSinkFunction.prepareCommit() 在 barrier 对齐后被调用,finishCommit() 在 checkpoint 完成后被调用
- 无静默跳过:participant.saveState() 抛出异常时 checkpoint abort(不吞掉异常)
- 所有现有 checkpoint 端到端测试通过
- 相关
ai-dev/design/nop-stream/checkpoint-design.md§3 已更新反映集成状态 -
ai-dev/logs/对应日期条目已更新
Phase 4 - ProcessingGuarantee 集成
Status: completed
Targets: nop-stream-core (InputGate, ProcessingGuarantee, CheckpointPlan, CheckpointConfig), nop-stream-runtime (GraphModelCheckpointExecutor, CheckpointPlanBuilder)
- Item Types:
Decision,Proof
当前状态:ProcessingGuarantee 枚举存在(4 值,含 barrierAlignment 属性——注意:AT_LEAST_ONCE 的 barrierAlignment 当前为 true,需修正为 false)。InputGate 的 barrier 对齐始终阻塞已收到 barrier 的 channel。CheckpointBarrierTracker 只做 operator 级 ACK 计数,不处理 channel 级 barrier 对齐。
关键澄清:CheckpointBarrierTracker 不处理 barrier 对齐(它只跟踪 operator ACK 计数)。实际 barrier 对齐逻辑在 InputGate 中(InputGate.blockedChannels 数组控制 channel 阻塞)。因此 ProcessingGuarantee 只需修改 InputGate,不需要修改 CheckpointBarrierTracker。
工作项:
- 修正
ProcessingGuarantee.AT_LEAST_ONCE的barrierAlignment属性为false(当前为true,与语义矛盾) - 修改
CheckpointPlanBuilder:从 CheckpointConfig 读取 processing guarantee 配置,填入 CheckpointPlan.processingGuarantee - 修改
InputGate:构造函数接受 ProcessingGuarantee 参数(nullable,默认 STRICT_EXACTLY_ONCE)。当 barrierAlignment=false(AT_LEAST_ONCE)时,收到 barrier 的 channel 不阻塞后续 records,直接放行。当 barrierAlignment=true(STRICT_EXACTLY_ONCE)时,保持当前阻塞行为 - 修改
GraphExecutionPlan.build()或StreamTaskInvokable:将 ProcessingGuarantee 从 CheckpointPlan 传给 InputGate 构造函数 - 在
CheckpointConfig中增加processingGuarantee字段(默认 STRICT_EXACTLY_ONCE) - 端到端测试:STRICT_EXACTLY_ONCE 模式下 barrier 对齐正确阻塞/放行
- 测试:AT_LEAST_ONCE 模式下已收到 barrier 的 channel 不阻塞
Exit Criteria:
- ProcessingGuarantee.AT_LEAST_ONCE.barrierAlignment == false(修正后)
- CheckpointPlanBuilder 填入 processingGuarantee(非 null,默认 STRICT_EXACTLY_ONCE)
- InputGate 根据 ProcessingGuarantee 调整 barrier 后 channel 的阻塞/放行
- 端到端验证:STRICT_EXACTLY_ONCE 模式下 source → map → sink 端到端跑通,数据无丢失无重复
- 端到端验证:AT_LEAST_ONCE 模式下端到端跑通
- 接线验证:在端到端测试中验证 ProcessingGuarantee 被传入 InputGate(通过配置不同的 guarantee 并观察 barrier 行为差异)
- 无静默跳过:未知 ProcessingGuarantee 值抛出异常
- 所有现有 checkpoint 测试通过(默认行为不变)
- 相关
ai-dev/design/nop-stream/checkpoint-design.md§4 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 5 - EdgeConfig 流控集成
Status: completed
Targets: nop-stream-core (RecordWriter, InputGate, JobEdge), nop-stream-runtime (GraphExecutionPlan)
- Item Types:
Decision,Proof
当前状态:EdgeConfig、FlowControlPolicy、MemoryBudget 已创建。DeploymentPlanGenerator 生成 EdgeConfig.defaultConfig()。但 RecordWriter 和 InputGate 不知道 EdgeConfig 的存在。
工作项:
- 修改
JobEdge:增加EdgeConfig字段(可为 null 表示默认行为) - 修改
JobGraphGenerator:从 DeploymentPlan.edgeConfigs 或默认配置中读取 EdgeConfig,传入 JobEdge - 修改
GraphExecutionPlan.build():从 JobEdge 读取 EdgeConfig,传给 RecordWriter 构造函数 - 修改
RecordWriter:构造函数增加 EdgeConfig 参数(nullable),根据 flowControlPolicy 选择流控行为。BLOCKING_QUEUE(默认)= 当前行为(BlockingQueue.put 阻塞);CREDIT_BASED 和 ACK_WINDOW 暂抛 UnsupportedOperationException(分布式专用) - 修改
InputGate:构造函数增加 EdgeConfig 参数(nullable,预留接口,当前本地 runtime 只用 BLOCKING_QUEUE) - 端到端测试:BLOCKING_QUEUE 策略下 source → map → sink 端到端跑通
- 测试:队列满时 sender 阻塞(通过小容量队列 + 慢 consumer 验证)
Exit Criteria:
- JobEdge 持有 EdgeConfig(nullable)
- RecordWriter 根据 EdgeConfig.flowControlPolicy 选择流控行为
- BLOCKING_QUEUE 策略下队列满时 sender 阻塞
- 端到端验证:source → map → sink 在 BLOCKING_QUEUE 策略下端到端跑通
- 接线验证:RecordWriter 构造时接收 EdgeConfig,并根据策略选择不同的队列操作
- 无静默跳过:CREDIT_BASED 和 ACK_WINDOW 策略抛出 UnsupportedOperationException(非分布式 runtime 不支持)
- 本地 runtime 默认使用 BLOCKING_QUEUE,不影响现有测试
- 所有现有端到端测试通过
- 相关
ai-dev/design/nop-stream/architecture.md§6.5 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 6 - 连接器一致性能力声明
Status: completed
Targets: nop-stream-connector (BatchLoaderSourceFunction, BatchConsumerSinkFunction, MessageSourceFunction, MessageSinkFunction, DebeziumCdcSourceFunction), nop-stream-core (StreamRequirementValidator)
- Item Types:
Decision,Proof
当前状态:SourceConsistencyCapability 和 SinkConsistencyCapability 枚举存在。5 个 connector 不声明任何能力。StreamRequirementValidator 不校验 source/sink 能力。
工作项:
- 在
SourceFunction接口(或子接口)中增加SourceConsistencyCapability getSourceConsistency()default 方法(返回 BEST_EFFORT) - 在
SinkFunction接口(或子接口)中增加SinkConsistencyCapability getSinkConsistency()default 方法(返回 AT_LEAST_ONCE) - BatchLoaderSourceFunction 声明
SourceConsistencyCapability.AT_LEAST_ONCE - BatchConsumerSinkFunction 声明
SinkConsistencyCapability.IDEMPOTENT - MessageSourceFunction 声明
SourceConsistencyCapability.AT_LEAST_ONCE - MessageSinkFunction 声明
SinkConsistencyCapability.AT_LEAST_ONCE - DebeziumCdcSourceFunction 声明
SourceConsistencyCapability.REPLAYABLE - 修改
StreamRequirementValidator:当 pipeline 声明 STRICT_EXACTLY_ONCE 时,校验所有 source 至少 REPLAYABLE、所有 sink 至少 TWO_PHASE_COMMIT。校验失败时抛出 StreamException - 测试:声明 STRICT_EXACTLY_ONCE 但 source 只有 AT_LEAST_ONCE 能力时构建失败
- 测试:声明 STRICT_EXACTLY_ONCE 且 source REPLAYABLE + sink TWO_PHASE_COMMIT 时构建成功
Exit Criteria:
- 5 个 connector 各自声明正确的一致性能力
- SourceFunction/SinkFunction 有 default 方法返回一致性能力(向后兼容)
- StreamRequirementValidator 校验 source/sink 能力与 ProcessingGuarantee 的匹配
- 端到端验证:source → map → sink(各 connector 使用真实能力声明)端到端跑通
- 接线验证:StreamRequirementValidator 在 execute() 中被调用,且读取了 source/sink 的能力声明
- 无静默跳过:能力不足时抛出异常,不静默降级
- 所有现有 connector 测试通过
- 相关
ai-dev/design/nop-stream/connector-design.md已更新 -
ai-dev/logs/对应日期条目已更新
Phase 7 - EpochManifest 持久化与恢复
Status: completed
Targets: nop-stream-runtime (CheckpointCoordinator, GraphModelCheckpointExecutor), nop-stream-core (EpochManifest, ICheckpointStorage)
- Item Types:
Decision,Proof
当前状态:EpochManifest 数据类存在(含 epochId, jobId, pipelineId, taskSnapshots, planFingerprint 等字段)。但 CheckpointCoordinator 不生成 EpochManifest,使用 CompletedCheckpoint 代替。恢复时从 CompletedCheckpoint 读取。
工作项:
- 修改
CheckpointCoordinator:在所有 task ACK 收齐后,从 PendingCheckpoint 数据构建 EpochManifest(而非只构建 CompletedCheckpoint)。EpochManifest 填充已有字段:planFingerprint(来自 PartitionedPlan,Phase 1 已传递)、taskSnapshots 映射、epochId、jobId、pipelineId - 修改
ICheckpointStorage:增加storeEpochManifest(EpochManifest)和loadLatestEpochManifest(String jobId, String pipelineId)方法(保留现有 CompletedCheckpoint 方法向后兼容) - 修改
LocalFileCheckpointStorage和JdbcCheckpointStorage:实现 EpochManifest 的存储和加载 - 修改
GraphModelCheckpointExecutor恢复路径:优先从 EpochManifest 恢复(如果存在),否则 fallback 到 CompletedCheckpoint - 端到端测试:checkpoint 周期完成后 EpochManifest 被持久化(验证存储中有对应文件/记录)
- 端到端测试:恢复时从 EpochManifest 读取状态并正确恢复
Exit Criteria:
- CheckpointCoordinator 在 checkpoint 完成时生成 EpochManifest 并持久化
- EpochManifest 包含 planFingerprint、sourceOffsets、taskSnapshots 映射
- ICheckpointStorage 新增 EpochManifest 存储方法
- 端到端验证:source → map → sink 完成 checkpoint 后,EpochManifest 被写入存储(可从存储中读取验证)
- 端到端验证:从 EpochManifest 恢复后,状态与 checkpoint 前一致
- 接线验证:CheckpointCoordinator 在 ACK 收齐后调用 storeEpochManifest();恢复时调用 loadLatestEpochManifest()
- 无静默跳过:EpochManifest 持久化失败时 checkpoint 标记为失败(不静默继续)
- 所有现有 checkpoint 恢复测试通过
- 相关
ai-dev/design/nop-stream/checkpoint-design.md§2.6 §9 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 8 - Fingerprint 兼容性校验与 JobTerminationMode
Status: completed
Targets: nop-stream-core (StreamModelFingerprint, StreamModel), nop-stream-runtime (GraphModelCheckpointExecutor, CheckpointCoordinator)
- Item Types:
Decision,Proof
当前状态:StreamModel.computeFingerprint() 已存在(hash transformations + topology + requirements + checkpointParticipants via SHA-256)。StreamModelFingerprint.isCompatibleWith() 已实现。JobTerminationMode 和 JobTerminationContext 已创建。但 savepoint 恢复时不校验 fingerprint,JobTerminationMode 不影响作业终止行为。
工作项:
- 修改 savepoint 恢复路径(
GraphModelCheckpointExecutor.restoreFromCheckpoint()):加载 EpochManifest 后,调用StreamModel.computeFingerprint()计算当前 fingerprint,与EpochManifest.planFingerprint比较。不兼容时抛出 StreamException 拒绝恢复。注意:恢复时需要 StreamModel 实例,需要将其保存到 GraphModelCheckpointExecutor 字段中(Phase 1 已传递) - 修改 CheckpointConfig:增加
jobTerminationMode字段(默认 CANCEL),增加 JobTerminationContext 构建方法 - 修改
GraphModelCheckpointExecutor作业结束逻辑:DRAIN 模式触发 terminal checkpoint(TERMINAL_SAVEPOINT 类型,source 停止发送新数据后等待 in-flight 数据处理完);SUSPEND 模式触发 savepoint 后停止 source;CANCEL 模式直接停止(当前行为);EXPORT_SAVEPOINT 触发 savepoint 但不停止作业 - 端到端测试:savepoint 恢复时 fingerprint 不匹配抛出异常
- 端到端测试:DRAIN 模式下作业在 terminal checkpoint 完成后停止
- 端到端测试:SUSPEND 模式下作业在 savepoint 完成后暂停
Exit Criteria:
- savepoint 恢复时校验 fingerprint 兼容性,不兼容时拒绝恢复
- CheckpointConfig 包含 jobTerminationMode
- DRAIN 模式触发 TERMINAL_SAVEPOINT 类型 checkpoint
- SUSPEND 模式触发 savepoint 后停止 source
- 端到端验证:savepoint → 修改管线 topology → 恢复失败(fingerprint 不兼容)
- 端到端验证:DRAIN 模式下 source 处理完所有数据后 checkpoint 并正常结束
- 接线验证:恢复路径读取 EpochManifest.planFingerprint 并与当前 fingerprint 比较
- 无静默跳过:fingerprint 不匹配时抛出异常,不静默继续
- 所有现有 savepoint 测试通过
- 相关
ai-dev/design/nop-stream/checkpoint-design.md§7 已更新 -
ai-dev/logs/对应日期条目已更新
Phase 9 - 全链路集成验证
Status: completed
Targets: nop-stream-core, nop-stream-runtime, nop-stream-connector
- Item Types:
Proof
目标:验证 Phase 1-8 的所有集成点在端到端路径上完整连通。本 Phase 不新增功能,只做集成验证和修复。
工作项:
- 编写全链路端到端测试:source(REPLAYABLE) → map → keyBy → window → two-phase-commit-sink(TWO_PHASE_COMMIT),ProcessingGuarantee=STRICT_EXACTLY_ONCE,完成完整 checkpoint 周期(触发→对齐→participant saveState→快照→EpochManifest 持久化→participant finishCommit)
- 编写全链路恢复测试:从 EpochManifest 恢复后继续处理,验证状态一致性
- 编写全链路 DRAIN 测试:DRAIN 模式下 terminal checkpoint 完成后作业正常结束
- 编写全链路 fingerprint 测试:savepoint 恢复时 fingerprint 匹配成功、不匹配失败
- 修复集成测试中发现的任何接通问题
- 完整回归测试:
./mvnw test -pl nop-stream -am
Exit Criteria:
- 全链路端到端测试通过(包含所有新集成点的验证断言)
- 全链路恢复测试通过
- 全链路 DRAIN 测试通过
- 全链路 fingerprint 测试通过
- 端到端验证:从
env.addSource()到 sink 输出到 checkpoint 恢复的完整路径跑通 - 接线验证:Anti-Hollow 清单——逐个确认关键模型类在运行时被调用:
- PartitionedPlan 被 execute() 生成并被消费
- DeploymentPlan 被 execute() 生成并被消费
- StateShard 被 MemoryKeyedStateBackend 使用(shardCount>1 场景)
- CheckpointParticipant.saveState() 被算子快照阶段调用
- CheckpointParticipant.finishCommit() 被 CheckpointCoordinator 调用
- ProcessingGuarantee 被 InputGate 读取
- EdgeConfig 被 RecordWriter 读取
- EpochManifest 被 CheckpointCoordinator 生成并被存储
- JobTerminationMode 影响 GraphModelCheckpointExecutor 终止行为
- StreamModelFingerprint 在恢复时被校验
- SourceConsistencyCapability/SinkConsistencyCapability 被 StreamRequirementValidator 校验
- 无静默跳过:Anti-Hollow 清单——确认无空方法体、无吞异常、无 no-op 作为正常实现
-
./mvnw test -pl nop-stream -am全部通过 -
ai-dev/logs/对应日期条目已更新
Phase Status
| Phase | 名称 | Depends on | Status |
|---|---|---|---|
| 1 | 执行管线扩展 | 无 | completed |
| 2 | StateShard 路由 | 无 | completed |
| 3 | CheckpointParticipant 集成 | Phase 1 | completed |
| 4 | ProcessingGuarantee 集成 | Phase 3 | completed |
| 5 | EdgeConfig 流控 | Phase 1 | completed |
| 6 | 连接器一致性能力声明 | 无 | completed |
| 7 | EpochManifest 持久化与恢复 | Phase 3 | completed |
| 8 | Fingerprint 校验与 JobTerminationMode | Phase 7 | completed |
| 9 | 全链路集成验证 | Phase 1-8 | completed |
并行可能性:Phase 2、Phase 5、Phase 6 互相独立,可与 Phase 1 并行执行。Phase 3 依赖 Phase 1。Phase 4 依赖 Phase 3。Phase 7 依赖 Phase 3。Phase 8 依赖 Phase 7。Phase 9 依赖所有其他 Phase。
Closure Gates
关闭条件:只有本 section 所有条目以及每个 Phase 的 Exit Criteria 全部勾选为
[x]后,才能将Plan Status改为completed。
- 所有 9 个 Phase 的 Exit Criteria 全部满足
- 现有端到端测试(source → map → sink)通过
- 不存在被静默降级到 deferred / follow-up 的 in-scope live defect 或 contract drift
- 受影响的 owner docs(
ai-dev/design/6 个文档)已同步到 live baseline - 独立子 agent closure-audit 已完成并记录证据
- Anti-Hollow Check:closure audit 已验证(a)组件间调用链在运行时确实连通(不只是类型系统),(b)无空方法体/静默跳过/no-op 作为正常实现。具体验证:Phase 9 的 Anti-Hollow 清单全部通过
-
./mvnw compile -pl nop-stream -am -
./mvnw test -pl nop-stream -am - checkstyle / 代码规范检查通过
-
ai-dev/logs/记录每个 Phase 的执行日志
Deferred But Adjudicated
分布式 runtime (RuntimeNode / TaskAttempt / NodeLease / ClusterRegistry / fencing token)
- Classification:
out-of-scope improvement - Why Not Blocking Closure: 分布式 runtime 是独立架构层,本计划只实现本地 runtime 的模型和协议集成
- Successor Required:
yes - Successor Path: 待定,需要独立的设计文档
barrier 注入方式迁移(scheduler-push → source-pull)
- Classification:
out-of-scope improvement - Why Not Blocking Closure: 当前 scheduler-push 模式在单进程 runtime 下语义正确。source-pull 是分布式场景的优化,不影响 exactly-once 正确性
- Successor Required:
yes - Successor Path: 待分布式 runtime 计划启动时实现
WindowingStrategy 对 WindowOperator 的运行时改造
- Classification:
optimization candidate - Why Not Blocking Closure: 当前 WindowOperator 通过构造函数参数配置窗口行为,功能完整。WindowingStrategy 的运行时查找是模型层抽象优化,不影响窗口计算正确性
- Successor Required:
yes - Successor Path: 待 WindowOperator 需要跨 backend 移植时实现
Remote state service / RocksDB / Redis 后端
- Classification:
out-of-scope improvement - Why Not Blocking Closure: MemoryStateBackend + StateShard 已足够验证状态路由正确性
- Successor Required:
yes - Successor Path: 待定
Rescale / state redistribution
- Classification:
optimization candidate - Why Not Blocking Closure: 并行度固定场景下不需要 rescale,StateShard 模型已预留扩展空间
- Successor Required:
yes - Successor Path: 待定
CREDIT_BASED / ACK_WINDOW 流控策略实现
- Classification:
out-of-scope improvement - Why Not Blocking Closure: 这两种策略只在分布式 runtime 中需要。本地 runtime 使用 BLOCKING_QUEUE 已满足需求。本计划在 RecordWriter 中对这两种策略抛出 UnsupportedOperationException
- Successor Required:
yes - Successor Path: 待分布式 runtime 计划启动时实现
分布式专用模型类(未在本计划中集成)
以下 Plan 42 创建的模型类是分布式 runtime 或高级协议专用,本地 runtime 不需要:
-
SavepointMetadata— savepoint 的元数据容器,分布式恢复场景使用 -
SourceEnumeratorState— 分布式 source 的 split 注册表和分配状态 -
StateSegmentDescriptor— 状态段描述符,分布式远程状态服务场景使用 -
EpochState— epoch 生命周期状态枚举,分布式 coordinator HA 场景使用 -
TaskEpochSnapshot— Task 级 epoch 快照(shard/timer/participant 维度),分布式恢复场景使用 -
PaneInfo/PaneState— 窗口 Pane 状态,WindowingStrategy 运行时集成时使用 -
DrainableSource/WatermarkEstimator/DynamicSplitRequest/Response/RestrictionTracker— 分布式 source 协议接口 -
Classification:
out-of-scope improvement -
Why Not Blocking Closure: 这些模型类服务于分布式 runtime、远程状态服务、source split 协议等尚未实现的架构层。本地 runtime 不需要这些类参与运行时路径。类型已定义,为未来分布式 runtime 提供了类型基础。
-
Successor Required:
yes -
Successor Path: 待分布式 runtime 计划启动时集成
Non-Blocking Follow-ups
- CEP operator 对接标准状态后端
- nop-stream-flow(XDSL 编排)
- nop-stream-flink(外部后端适配)
- Connector 协议接口迁移到 api 模块(待 api 模块创建)
- 分布式专用模型类集成(见 Deferred But Adjudicated)
Closure
Status Note: All 9 phases completed. Independent closure audit verified all exit criteria met, anti-hollow checks passed, all integration points confirmed at runtime.
Closure Audit Evidence:
- Reviewer / Agent: Independent sub-agent (GLM-5.1, separate task)
- Evidence: Full source code audit confirmed all changes exist and are substantive. Test suite passes.
Follow-up:
- CEP operator 对接标准状态后端
- nop-stream-flow(XDSL 编排)
- nop-stream-flink(外部后端适配)
- Connector 协议接口迁移到 api 模块(待 api 模块创建)
- 分布式专用模型类集成(见 Deferred But Adjudicated)