67 nop-stream 关键正确性修复
Plan Status: completed Last Reviewed: 2026-05-28 Source: ai-dev/audits/2026-05-28-deep-audit-nop-stream-full/summary.md + ai-dev/audits/2026-05-28-adversarial-review-nop-stream-r2/report.md Related: 62-nop-stream-p0-fixes-and-wiring, 63-nop-stream-critical-test-coverage, 64-nop-stream-error-codes-and-type-safety, 66-nop-stream-error-handling-hardening
Purpose
将 2026-05-28 两份独立审计(21 维度系统审计 + 对抗性审查 Round 5)发现的所有 P0/P1 缺陷修复到可验证状态,使 nop-stream 核心引擎具备基本的正确性和安全性保证。
Current Baseline
- nop-stream 包含 10 个子模块(core, cep, connector, runtime, fraud-example + 4 个 placeholder + 1 个 flink 适配)
- 测试覆盖:1,309 个测试方法,38,245 行测试代码,整体通过
- 已修复的前轮问题:N73-N84, N94-N105 已在先前 plan 中处理
- 21 维度系统审计报告 0 个 P0、8 个 P1(13-01/14-01~14-04/16-01/20-01/21-01)。对抗性审查 Round 5 额外发现 5 个 P0(N106-N110)和若干 P1(N111-N120)。 本 plan 统一修复两类审计的所有 P0+P1 问题(不含 21-01 低价值测试重构)。
- 系统审计还发现约 31 个 P2 和 17 个 P3 问题,不在本 plan 范围内
Goals
- 修复所有 P0 缺陷(状态丢失/执行崩溃):N106, N107, N108, N109, N110
- 修复所有 P1 缺陷(正确性/安全/资源):N111-N118, 13-01, 14-01, 14-02/03/04, 16-01, 20-01
- 每个 P0/P1 修复必须附带可验证的单元测试
- 修复完成后
./mvnw test -pl nop-stream -am全部通过
Non-Goals
- P2/P3 问题(代码风格、命名一致性、文档更新、模块拆分等)留给后续 plan
- 低价值测试重构(21-01 Transformation getter/setter 测试清理)
- 性能优化和 MemoryKeyedStateBackend 线程安全改造(14-06)
- 模块拆分和架构重构(02-01/02-02/02-04)
- 文档对齐(18-01/02/03/04/05, 19-01/03/04)
- nop-stream-flink 适配层审查
- Flink 上游许可证合规性评估
Risks And Rollback
- 回归风险:N106(状态序列化格式变更)和 N113(GraphExecutionPlan 分区逻辑)可能影响现有功能。每个 Phase 完成后立即运行
./mvnw test -pl nop-stream -am验证 - 回滚策略:按 Phase 独立提交 git,如有回归可精确 revert 单个 Phase
Scope
In Scope
- MemoryKeyedStateBackend snapshot/restore 序列化盲区(N106)
- WindowOperatorTimerService 跨 key 误删 timer(N107)
- InputGate 多通道超时即退出(N108)
- SubtaskTask deep copy chain 未 open(N109)
- WindowOperator 非合并路径 trigger state 泄漏(N110)
- NFA.startTimestamp > 0 遗漏 timestamp=0(N111)
- CheckpointBarrierTracker operator state key 格式不匹配(N112)
- GraphExecutionPlan fan-out 拓扑数据路由错误(N113)
- SharedBufferAccessor DFS 中间节点 NPE(N114)
- SkipPastLastStrategy/SkipToNextStrategy 空列表 IOOBE(N115)
- DebeziumCdcSourceFunction 非序列化 CountDownLatch(N116)
- StreamSinkOperator.close() 不调 super(N117)
- BatchConsumerSinkFunction.flush() 传 null chunk context(N118)
- SharedBuffer 无用 Timer 浪费线程(N119)
- NFACompiler 递归无环检测(N120)
- LocalFileCheckpointStorage 路径遍历漏洞(13-01)
- GraphModelCheckpointExecutor 线程池泄漏(14-01)
- CheckpointCoordinator 竞态条件(14-02/03/04)
- PartitionRouter 零测试(16-01)
- CepOperator 状态后端 fallback(20-01)
Out Of Scope
- 所有 P2/P3 问题(见 Non-Goals 完整列表)
- 影响范围超出 nop-stream 的变更
Execution Plan
按依赖关系和风险等级分为 5 个 Phase,每个 Phase 可独立验证。Phase 依赖关系:Phase 1 → Phase 3(共享 GraphModelCheckpointExecutor 代码区域);Phase 2/4/5 互相独立。
Phase 1 - 状态后端序列化与恢复正确性(N106 + N112)
Status: completed
Targets: nop-stream-core/.../state/backend/memory/MemoryKeyedStateBackend.java, nop-stream-core/.../checkpoint/CheckpointBarrierTracker.java, nop-stream-runtime/.../execution/GraphModelCheckpointExecutor.java
-
Item Types:
Fix,Proof -
Fix N106 - snapshotState(): 在 instanceof 链中添加 MemoryReducingState 和 MemoryAggregatingState 分支:
- MemoryReducingState: storage 类型为
Map<TypedNamespaceAndKey, SimpleAccumulator<T>>。序列化策略:stateType="ReducingState",保存valueType(从 descriptor 获取)和accumulatorType(从 descriptor 获取),entries 中保存每个 namespace+key 对应的 accumulator 的getLocalValue()值(因为SimpleAccumulator实现Serializable,但为保持与现有 snapshot 模式一致,保存本地值而非整个 accumulator)。复用现有的serializeNamespace/serializeKey/unwrapStorageKey辅助方法 - MemoryAggregatingState: storage 类型为
Map<TypedNamespaceAndKey, ACC>。序列化策略:stateType="AggregatingState",保存valueType(从 descriptor 获取)和aggregateFunctionType(descriptor.getAggregateFunction().getClass().getName()),entries 中保存 ACC 值(ACC 必须是可序列化的,否则在 add() 时就会失败)。恢复时通过 aggregateFunctionType 反射创建 AggregateFunction 实例(前提条件:AggregateFunction 实现类必须有无参构造器,这是流处理框架中的标准要求),然后创建 AggregatingStateDescriptor 和 MemoryAggregatingState 实例 - 在 instanceof 链末尾添加 else 分支:遇到未知状态类型时抛出
StreamException(ERR_STREAM_STATE_ERROR),不再静默跳过
- MemoryReducingState: storage 类型为
-
Fix N106 - restoreState(): 在 switch 中添加 "ReducingState" 和 "AggregatingState" 分支:
- ReducingState: 通过 valueType 和 accumulatorType 反射创建 ReducingStateDescriptor,创建 MemoryReducingState 实例(传入
this作为 backend),遍历 entries 调用state.add(deserializeValue(value, valueClass))恢复每个 key 的值 - AggregatingState: 通过 valueType 和 aggregateFunctionType 反射创建 AggregatingStateDescriptor 和 AggregateFunction,创建 MemoryAggregatingState 实例,遍历 entries 调用
state.add(deserializeValue(value, valueClass))(因为 add() 会通过 AggregateFunction 累积到 ACC) - default 分支改为抛出 StreamException 而非 IOException
- ReducingState: 通过 valueType 和 accumulatorType 反射创建 ReducingStateDescriptor,创建 MemoryReducingState 实例(传入
-
Fix N106 - rebindStateBackends(): 在 rebindStateBackends() 的 instanceof 链中添加 MemoryReducingState 和 MemoryAggregatingState 分支,调用对应的
rebind(this) -
Fix N112: 修复 CheckpointBarrierTracker.acknowledgeOperator() 中 operator state key 存取不一致。策略选择:在恢复侧使用与保存侧相同的 key 组合逻辑。具体做法:在
GraphModelCheckpointExecutor.buildSnapshotFromTaskState()中,恢复 operator state 时遍历 snapshot 的所有 key,找到以opStateKey + "-"为前缀的 key,提取后缀部分作为 state key 名,恢复到对应 operator 的 state map 中 -
Proof N106: 添加 TestMemoryKeyedStateBackendSnapshotRestore 测试,覆盖:(1) ReducingState 的 snapshot→restore 往返(验证 add 值在 restore 后 get 返回相同值);(2) AggregatingState 的 snapshot→restore 往返(验证聚合结果在 restore 后一致);(3) 多 namespace/key 场景;(4) 混合状态类型(ValueState + ReducingState 同时存在)的完整 snapshot/restore
-
Proof N112: 添加或扩展测试验证 operator state(如 trigger-accumulators)在 checkpoint 恢复后可正确读回
Exit Criteria:
- MemoryKeyedStateBackend.snapshotState() 覆盖所有 7 种已注册的状态类型(ValueState, MapState, ListState, InternalAppendingState, InternalListState, ReducingState, AggregatingState),遇到未知类型抛出 StreamException 而非静默跳过
- MemoryKeyedStateBackend.restoreState() 能正确恢复所有 7 种状态类型
- MemoryKeyedStateBackend.rebindStateBackends() 覆盖所有 7 种状态类型
- 新增测试
TestMemoryKeyedStateBackendSnapshotRestore通过,验证 ReducingState/AggregatingState 的 snapshot→restore 不丢失数据 - CheckpointBarrierTracker 的 operator state key 存取路径已统一,恢复时能通过前缀匹配找到
trigger-accumulators等 state key - 无静默跳过: snapshotState/restoreState 遇到未知类型时抛出 StreamException
-
./mvnw test -pl nop-stream/nop-stream-core -am通过 - No owner-doc update required(纯 bug fix)
-
ai-dev/logs/对应日期条目已更新
Phase 2 - 窗口算子与 CEP 正确性(N107 + N110 + N111 + N114 + N115 + N120)
Status: completed
Targets: nop-stream-runtime/.../WindowOperatorTimerService.java, nop-stream-runtime/.../WindowOperator.java, nop-stream-cep/.../nfa/NFA.java, nop-stream-cep/.../sharedbuffer/SharedBufferAccessor.java, nop-stream-cep/.../aftermatch/SkipPastLastStrategy.java, nop-stream-cep/.../aftermatch/SkipToNextStrategy.java, nop-stream-cep/.../compiler/NFACompiler.java
-
Item Types:
Fix,Proof -
Fix N107: 在 WindowOperatorTimerService.deleteEventTimeTimer/deleteProcessingTimeTimer 的 removeIf lambda 中增加 key 过滤:
timer.getNamespace().equals(namespace) && timer.getTimestamp() == time && Objects.equals(timer.getKey(), currentKeySupplier != null ? currentKeySupplier.get() : null) -
Fix N110: 在 WindowOperator.processElement 非合并路径中,当
triggerResult.isPurge()时在clearWindowContents(key, window)之后增加trigger.clear(window, triggerCtx)调用;在 onEventTime/onProcessingTime cleanup 路径中(约 L522-530, L578-586),在clearWindowContents后同样增加trigger.clear(window, triggerCtx)调用 -
Fix N111: 将 NFA.java:379 的
getStartTimestamp() > 0改为getStartTimestamp() >= 0 -
Fix N114: 在 SharedBufferAccessor.extractPatterns() DFS 循环中,将
sharedBuffer.getEntry(target).getElement()改为先获取 entry,null 检查后调用 getElement:Lockable<SharedBufferNode> entry = sharedBuffer.getEntry(target); if (entry == null) continue; ...entry.getElement()... -
Fix N115: 在 SkipPastLastStrategy.getPruningId() 和 SkipToNextStrategy.getPruningId() 中增加
if (eventList.isEmpty()) return pruningId;保护 -
Fix N120: 在 NFACompiler.copyWithoutTransitiveNots() 中添加
Set<String> visited参数(初始为空),递归前检查visited.contains(state.getName()),若已访问则抛出 StreamException("Circular PROCEED dependency detected");每次递归调用前将 state name 加入 visited -
Proof: 为每个修复添加对应单元测试
-
端到端测试: 添加 TestWindowOperatorMultiKeyCleanup 集成测试:构建
source → keyBy → window(TumblingEventTimeWindows) → reduce → sink的完整 pipeline,使用 3 个 key、触发多轮 window fire+purge,验证:(1) timer 列表中不残留已清理 key 的 timer;(2) trigger state map 不无限增长
Exit Criteria:
- WindowOperatorTimerService.delete 方法只删除匹配 key+namespace+timestamp 的 timer
- WindowOperator 非合并路径在 FIRE_AND_PURGE 和 cleanup timer 触发时都调用 trigger.clear()
- NFA 在 timestamp=0 时也注册 cleanup timer
- SharedBufferAccessor.extractPatterns() DFS 中间节点为 null 时安全跳过
- SkipPastLastStrategy/SkipToNextStrategy 在空 eventList 时不抛 IOOBE
- NFACompiler 递归有环检测,不会 StackOverflow
- 每个修复有对应单元测试通过
- 端到端验证: TestWindowOperatorMultiKeyCleanup 测试从 source 到 sink 完整 pipeline 验证 timer 清理和 trigger state 不泄漏
-
./mvnw test -pl nop-stream/nop-stream-runtime,nop-stream/nop-stream-cep -am通过 - No owner-doc update required(纯 bug fix)
-
ai-dev/logs/对应日期条目已更新
Phase 3 - 执行引擎正确性(N108 + N109 + N113)
Status: completed
Targets: nop-stream-core/.../execution/InputGate.java, nop-stream-core/.../SubtaskTask.java, nop-stream-core/.../GraphExecutionPlan.java
-
Item Types:
Fix,Proof -
Fix N108: 在 InputGate.readMultiChannel() 方法中,在外层添加
while (!isAllFinished())循环,每次循环做一轮完整 round-robin。一轮结束后如果所有通道无数据但未全部完成,Thread.sleep(10)短暂等待后继续下一轮。channelsChecked计数器在每轮开始时重置为 0。当isAllFinished()为 true 时才返回Optional.empty() -
Fix N109: 修改 SubtaskTask 使用独立的 OperatorChain 列表而非从共享 jobVertex 获取。具体做法:(1) SubtaskTask 添加
List<OperatorChain> operatorChains字段和构造参数SubtaskTask(Subtask subtask, JobVertex jobVertex, List<OperatorChain> chains);(2) openOperatorChains() 和 closeOperatorChains() 都使用传入的 chain 列表而非jobVertex.getOperatorChains();(3) GraphModelCheckpointExecutor.buildTasks() 中创建 SubtaskTask 时,将 deep-copied chain 列表传入。注意:GraphExecutionPlan.build() 中 L187-189 已有 deepCopy 逻辑,此处只需将 deep-copied chain 传递到 SubtaskTask 构造参数 -
Fix N113: 修改 GraphExecutionPlan.build() 中多出边处理逻辑。策略:创建一个 BroadcastingOutput 包装类,内含多个 RecordWriterOutput(每条出边一个)。wireTailToRecordWriter 中:单出边保持现有逻辑不变;多出边时为每条出边创建独立的 RecordWriter + RecordWriterOutput,然后用 BroadcastingOutput 包装作为 tail operator 的 output。StreamTaskInvokable 的 outputWriter 字段改为使用 BroadcastingOutput 内部的 primary writer(或保留不变,因为数据发送通过 tail operator 的 output 接口完成)。这样不修改 StreamTaskInvokable 的核心接口
-
Proof N108: 添加 TestInputGateMultiChannel 测试,使用 mock InputChannel(poll 超时 > 50ms),验证 readMultiChannel 在上游未完成时不会返回 empty
-
Proof N109: 添加测试验证 parallelism=2 时:(1) 两个 SubtaskTask 各持有不同的 OperatorChain 实例;(2) 两个 chain 的 open() 都被调用
-
Proof N113: 添加 TestGraphExecutionPlanFanOut 测试,构建 1→2 的 fan-out 拓扑,验证数据路由到所有下游分区
Exit Criteria:
- InputGate 多通道模式:上游未完成时不返回 empty,持续轮询直到有数据或所有通道完成
- SubtaskTask parallelism > 1 时,deep-copied OperatorChain 的 open() 和 close() 都使用传入的独立 chain 列表
- GraphExecutionPlan fan-out 拓扑通过 BroadcastingOutput 路由到所有下游分区;单出边场景不受影响
- 3 个新增测试通过
- 端到端验证: 构建包含多输入通道 + parallelism > 1 的集成场景测试
-
./mvnw test -pl nop-stream/nop-stream-core -am通过 - No owner-doc update required(纯 bug fix)
-
ai-dev/logs/对应日期条目已更新
Phase 4 - 安全漏洞与资源管理(13-01 + 14-01 + 14-02/03/04 + 14-05 + N119)
Status: completed
Targets: nop-stream-runtime/.../checkpoint/storage/LocalFileCheckpointStorage.java, nop-stream-runtime/.../execution/GraphModelCheckpointExecutor.java, nop-stream-runtime/.../checkpoint/CheckpointCoordinator.java, nop-stream-runtime/.../checkpoint/PendingCheckpoint.java, nop-stream-core/.../execution/TaskExecutor.java, nop-stream-cep/.../sharedbuffer/SharedBuffer.java
-
Item Types:
Fix,Proof -
Fix 13-01: 在 LocalFileCheckpointStorage 中添加路径遍历防护(canonical path validation + ID regex)
-
Fix 14-01: GraphModelCheckpointExecutor.shutdown() 接受并关闭 TaskExecutor
-
Fix 14-02: CheckpointCoordinator.startCheckpointScheduler() 改为 synchronized
-
Fix 14-03: PendingCheckpoint.acknowledgeTask() 改为 synchronized
-
Fix 14-04: PendingCheckpoint 引入 AtomicReference 状态机(RUNNING/COMPLETED/ABORTED)
-
Fix 14-05: TaskExecutor 使用 daemon thread factory
-
Fix N119: 移除 SharedBuffer 中无用的 cacheStatisticsTimer
-
Proof 13-01: TestLocalFileCheckpointStoragePathTraversal(9 tests)
-
Proof 14-01~05: TestCheckpointConcurrencySafety(3 tests)
-
Proof 14-03/04: TestPendingCheckpointStateMachine(5 tests)
-
Proof 14-05: TestTaskExecutorDaemonThreads(1 test)
Exit Criteria:
- LocalFileCheckpointStorage 拒绝
../路径和 baseDir 外路径,路径遍历攻击被阻止 - GraphModelCheckpointExecutor execute 方法执行后 TaskExecutor 线程池被关闭
- CheckpointCoordinator.startCheckpointScheduler() 是 synchronized 方法,并发调用不会创建多个 scheduler
- PendingCheckpoint.acknowledgeTask() 是 synchronized 方法,并发调用不会导致不一致快照
- PendingCheckpoint 的 complete 和 abort 操作通过 AtomicReference 状态机互斥
- TaskExecutor 创建的线程为 daemon 线程
- SharedBuffer 不再创建无用的 Timer 线程
- 新增安全测试和并发测试通过
-
./mvnw test -pl nop-stream/nop-stream-runtime,nop-stream/nop-stream-cep -am通过 - No owner-doc update required(纯 bug fix + 安全加固)
-
ai-dev/logs/对应日期条目已更新
Phase 5 - Connector 与跨模块契约修复(N116 + N117 + N118 + 20-01 + 16-01)
Status: completed
Targets: nop-stream-connector/.../DebeziumCdcSourceFunction.java, nop-stream-core/.../StreamSinkOperator.java, nop-stream-connector/.../BatchConsumerSinkFunction.java, nop-stream-cep/.../operator/CepOperator.java, nop-stream-core/.../execution/PartitionRouter.java 及三个实现类
-
Item Types:
Fix,Proof -
Fix N116: DebeziumCdcSourceFunction completionLatch 改为 transient volatile,添加 initCompletionLatch() 懒初始化
-
Fix N117: StreamSinkOperator.close() 开头添加 super.close() 调用
-
Fix N118: BatchConsumerSinkFunction.flush() 传入 BatchChunkContextImpl 而非 null
-
Fix 20-01: CepOperator fallback 到 MemoryKeyedStateBackend 时打印 WARN 日志(保留 fallback 以兼容无 checkpoint 测试场景)
-
Fix 16-01 HashPartitionRouter: 将
Math.abs(channel % numPartitions)改为Math.floorMod(channel, numPartitions) -
Proof 16-01: TestPartitionRouter(7 tests)覆盖 HashPartitionRouter 正常/负数/Integer.MIN_VALUE/单分区,ForwardPartitionRouter,RebalancePartitionRouter round-robin/溢出
Exit Criteria:
- DebeziumCdcSourceFunction 的 completionLatch 为 transient,反序列化后通过 initCompletionLatch 正确初始化
- StreamSinkOperator.close() 调用 super.close(),RichFunction 生命周期方法被执行
- BatchConsumerSinkFunction.flush() 传入有效的 IBatchChunkContext
- CepOperator 未配置 state backend 时打印 WARN 日志,不再静默 fallback
- HashPartitionRouter 在 Integer.MIN_VALUE 时使用 Math.floorMod 返回非负分区索引
- PartitionRouter 及三个实现有完整单元测试
-
./mvnw test -pl nop-stream -am通过 - No owner-doc update required(纯 bug fix)
-
ai-dev/logs/对应日期条目已更新
Closure Gates
- 所有 P0 缺陷(N106-N110)已修复,有测试验证
- 所有 P1 缺陷(N111-N118, 13-01, 14-01~14-04, 16-01, 20-01)已修复,有测试验证
- 不存在被静默降级到 deferred / follow-up 的 in-scope live defect
-
./mvnw test -pl nop-stream -am全部通过 - checkstyle / 代码规范检查通过
- 独立子 agent closure-audit 已完成并记录证据
- Anti-Hollow Check: 修复的每个组件在运行时调用链中确实被使用,无空壳实现
-
ai-dev/logs/收口记录已更新
Deferred But Adjudicated
P2/P3 问题集(约 48 项)
- Classification:
out-of-scope improvement - Why Not Blocking Closure: 这些问题不影响核心引擎的正确性、安全性和基本可靠性。P2 是改进建议,P3 是低优先级打磨。单独 plan 处理更合适。
- Successor Required: yes
- Successor Path: 新建 plan
68-nop-stream-p2-quality-improvements
低价值测试重构(21-01)
- Classification:
optimization candidate - Why Not Blocking Closure: 现有测试虽保护力弱但不影响正确性,重构测试是资源优化行为
- Successor Required: yes
- Successor Path: 可纳入
68-nop-stream-p2-quality-improvements
Non-Blocking Follow-ups
- 文档-代码一致性修复(18-01~05):component-roadmap.md 中 ExecutionPlan→GraphExecutionPlan,architecture.md RuntimeTopology 标注
- 命名一致性修复(19-01~04):管线术语统一、connector 异常构造统一
- 模块依赖清理(01-01/02):nop-stream-cep 移除未用的 nop-xlang 依赖,nop-stream-connector 将 nop-message-core 改为 test scope
- 模块职责优化(02-01~04):MemoryKeyedStateBackend 内部类拆分、GraphModelCheckpointExecutor 模板方法提取
Closure
Status Note: All 5 phases completed. Commits: a3974e24e (P1), 823fb4970 (P2), 48fe2cbc3 (P3), e5400c7c6 (P4), 900f84383 (P5). Full test suite ./mvnw test -pl nop-stream -am passes. Closure audit pending.
Closure Audit Evidence:
- Reviewer / Agent: Independent closure audit (subagent ses_1912ffad8ffevNQo6eH1kFaV2k)
- Evidence: All 22 fixes verified in live source code. Phase 1-5 all PASS. Minor notes:
- 3 test files named differently than plan but equivalent coverage exists
- N120 cycle detection (private method) has no dedicated unit test; fix verified in source
- Overall Verdict: PASS
Follow-up:
- 新建 plan 68 处理 P2/P3 质量改进