85 nop-stream 21 维度深度审计未覆盖发现修复
Plan Status: completed Last Reviewed: 2026-05-31 Source: ai-dev/audits/2026-05-31-deep-audit-nop-stream/summary.md(维度 03/10/13/14/15/18/20) Related: 84-nop-stream-remaining-audit-findings-remediation (completed), 83-nop-stream-deep-audit-2026-05-31-remediation (completed)
Purpose
将 2026-05-31 21 维度深度审计中未被 Plans 83/84 覆盖的发现修复到可验证状态。Plan 83 覆盖了该审计的 8 个维度(01/02/09/13/15/16/17/21),Plan 84 覆盖了独立的 6 维度 full 审计。本 plan 处理剩余维度中的 8×P1 + 12×P2 有效发现。
Current Baseline
- Plans 81-84 完成后
./mvnw test -pl nop-stream -am全量通过(354+ tests) - Plans 73-84 覆盖了 Round 1-13 对抗性审查 + 2026-05-25/28/30/31 深度审计的大部分发现
- 2026-05-31-deep-audit-nop-stream(21 维度,8×P1 + 12×P2)的发现分布在全部 21 个维度中
- Plan 83 仅覆盖其中 8 个维度(01/02/09/13/15/16/17/21),其余维度的发现未被任何 plan 拾取
- 经 live repo 逐条验证,以下 8×P1 + 12×P2 发现全部仍然存在于当前代码中
待修复发现
| 编号 | 严重程度 | 维度 | 文件 | 摘要 |
|---|---|---|---|---|
| 03-03 | P1 | API 表面积 | StateSnapshot/OperatorSnapshotResult/TaskStateSnapshot | 整条检查点管线 Map<String, Object> 无编译期类型约束 |
| 03-07 | P1 | API 表面积 | RuntimeContext.java, IterationRuntimeContext.java | 空接口,getRuntimeContext() 返回无可用方法的对象 |
| 14-01 | P1 | 异步事务 | CheckpointCoordinator.java:342 | setTasksToAcknowledge() clear+add 非原子,并发检查点可被跳过 |
| 14-02 | P1 | 异步事务 | TwoPhaseCommitSinkFunction.java:83 | finishCommit() 在 synchronized 块内执行外部 I/O |
| 15-P1-02 | P1 | 类型安全 | StreamRecord.java:110 | replace() 类型逃逸模式,运行时改变对象泛型类型参数 |
| 15-P1-05 | P1 | 类型安全 | MemoryKeyedStateBackend.java:63 | Map<String, Object> states 无注册时类型检查 |
| 18-01 | P1 | 文档一致性 | docs-for-ai/INDEX.md | nop-stream 在文档导航体系中完全缺位 |
| 15-P1-01 | P1 | 类型安全 | StateSnapshot/TaskStateSnapshot | 同 03-03,异构状态存储缺乏编译期约束 |
| 10-01 | P2 | XDSL | CepPatternBuilder.java:69 | 非起始步骤 where/until 条件被静默丢弃(功能性 bug) |
| 13-02 | P2 | 安全 | SimpleTypeSerializer.java:34 | Java 原生反序列化无 ObjectInputFilter |
| 14-03 | P2 | 异步事务 | CheckpointCoordinator.java:417 | 重试循环无退避策略 |
| 14-08 | P2 | 异步事务 | Lockable.java:54 | TOCTOU 竞态,refCounter 可变负 |
| 14-09 | P2 | 异步事务 | SharedBuffer.java:303 | flushCache() putAll+clear 非原子 |
| 15-P0-04 | P2 | 类型安全 | MessageSourceFunction.java:110 | (T) msg 无类型校验 cast |
| 20-02 | P2 | 跨模块 | BatchConsumerSinkFunction | 未覆盖 SinkFunction.finish() |
| 16-01* | P2 | 测试 | WindowOperator | 无直接命名的单元测试 |
| 16-03* | P2 | 测试 | CepOperator state recovery | snapshot/restore 管线未完整测试 |
| 13-01 | P2 | 安全 | ClassNameValidator.java | javax./jakarta. 白名单前缀过宽(Plan 83 Phase 1 收窄了 java. 但未覆盖 javax./jakarta.) |
| 16-02 | P2 | 测试 | TestOutput / OperatorTestHarness | TestOutput 重复定义 12 次,OperatorTestHarness 未被使用 |
| 09-部分 | P2 | 错误处理 | 多个 ErrorCode 使用点 | 部分 ErrorCode 调用缺少 .param() 上下文参数 |
审计发现更正与分类
- 03-03 与 15-P1-01 指向同一个根因(状态快照管线 Map<String, Object>),合并处理
- 18-01 是文档 gap,独立于代码修复
- 15-P1-02(StreamRecord.replace)是 Apache Flink 继承的 rogue pattern,被整个 operator 链广泛使用。完全修复需要重构 operator 链,属于架构变更。本 plan 降级为防御性加固(添加运行时类型检查)
- 15-P1-05(MemoryKeyedStateBackend Map<String, Object>)同上,完全修复需要重新设计状态注册机制。本 plan 添加运行时注册时类型检查
- 03-07(空接口 RuntimeContext/IterationRuntimeContext)是完全的 stub。填充需要设计 API,属于架构决策。本 plan 标记为 Decision,记录需填充的 API 列表
Goals
- 修复全部功能性 bug(10-01 CepPatternBuilder where/until 丢弃)
- 修复并发安全问题(14-01 非原子更新、14-08 TOCTOU、14-09 非原子 flush)
- 修复 2PC 正确性问题(14-02 synchronized I/O、14-03 无退避重试)
- 修复安全加固(13-02 ObjectInputFilter)
- 修复类型安全防御(15-P1-02 replace 运行时检查、15-P1-05 状态注册检查、15-P0-04 消息类型检查、03-03 快照类型标记)
- 修复 SinkFunction 契约(20-02 finish 覆盖)
- 修复文档 gap(18-01 INDEX.md 补充 nop-stream 导航)
- 填补测试覆盖盲区(16-01 WindowOperator、16-03 CepOperator recovery)
- 每个修复完成后
./mvnw test -pl nop-stream -am全部通过
Non-Goals
- RuntimeContext/IterationRuntimeContext 的完整 API 设计和实现(需架构决策)
- StreamRecord.replace() 的完全类型安全重构(影响整个 operator 链)
- MemoryKeyedStateBackend 的完全类型安全重构(需重新设计状态注册机制)
- Map<String, Object> 到完全类型安全泛型的转换(影响整条 checkpoint 管线)
- P3 发现修复(20+ 项)
- CEP ClosureCleaner 等效机制(Plan 82 Non-Blocking Follow-up)
- GraphExecutionPlan OperatorChain 隔离(Plan 82 Deferred R13-AR-12)
- GraphModelCheckpointExecutor 类拆分(Plan 83 Deferred 02-02)
- fraud-example 端到端验证
Scope
In Scope
- nop-stream-core: StreamRecord、MemoryKeyedStateBackend、StateSnapshot/OperatorSnapshotResult/TaskStateSnapshot、RuntimeContext、SimpleTypeSerializer、TwoPhaseCommitSinkFunction
- nop-stream-runtime: CheckpointCoordinator
- nop-stream-cep: CepPatternBuilder、Lockable、SharedBuffer、CepOperator
- nop-stream-connector: MessageSourceFunction、BatchConsumerSinkFunction
- docs-for-ai: INDEX.md、module-groups.md
- 对应新增/修改测试
Out Of Scope
- 架构级重构(RuntimeContext API 设计、状态管线类型系统重设计)
- P3 发现
- Deferred items from Plans 82/83/84
Execution Plan
Phase 1 - 功能性 bug 修复(10-01)+ CEP 并发安全(14-08, 14-09)
Status: completed
Targets: nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/model/builder/CepPatternBuilder.java, nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/Lockable.java, nop-stream/nop-stream-cep/src/main/java/io/nop/stream/cep/nfa/sharedbuffer/SharedBuffer.java
-
Item Types:
Fix -
10-01: 在
CepPatternBuilder.buildGroupPattern()的 do...while 循环中(行 69-70),对CepPatternSingleModel类型的后续步骤调用buildSinglePattern(pattern, (CepPatternSingleModel) nextModel)以应用 where/until 条件。在buildFollow()调用之后、partModel = nextModel之前插入调用 -
14-08: 将
Lockable.release()的两步 check-then-act(行 55refCounter.get() <= 0→ 行 59refCounter.decrementAndGet())替换为 CAS 循环:do { old = refCounter.get(); if (old <= 0) return true; } while (!refCounter.compareAndSet(old, old - 1)); return old == 1; -
14-09:
SharedBuffer.flushCache()对entryCache和eventsBufferCache两个缓存都应用快照-清空-写入模式:(a)Map<K,V> snapshot1 = new HashMap<>(entryCache); entryCache.clear(); entries.putAll(snapshot1);(b)Map<K,V> snapshot2 = new HashMap<>(eventsBufferCache); eventsBufferCache.clear(); eventsBuffer.putAll(snapshot2);。同时修复advanceTime()中removeIf在 while 循环内重复执行的问题(行 175 应移到循环外)
Exit Criteria:
- CepPatternBuilder 后续步骤的 where/until 条件被正确应用,不再静默丢弃
- Lockable.release() 使用 CAS 循环,refCounter 不可能变负
- SharedBuffer.advanceTime() 的 removeIf 在循环外只执行一次
- 新增测试:TestCepPatternBuilder 验证多步骤 pattern 的 where/until 条件生效
- 新增测试:TestLockable 验证并发 release 不导致 refCount 变负
- 端到端验证:CEP 多步骤 pattern 匹配测试从
CepPatternBuilder.buildGroupPattern()到NFA.run()完整跑通,验证 where/until 在后续步骤生效 - 无静默跳过:CepPatternBuilder 后续步骤的条件应用有实际代码路径,非空方法体
-
./mvnw test -pl nop-stream -am全部通过 - No owner-doc update required
-
ai-dev/logs/对应日期条目已更新
Phase 2 - 检查点并发安全(14-01, 14-02, 14-03)
Status: completed
Targets: nop-stream/nop-stream-runtime/src/main/java/io/nop/stream/runtime/checkpoint/CheckpointCoordinator.java, nop-stream/nop-stream-core/src/main/java/io/nop/stream/core/common/functions/sink/TwoPhaseCommitSinkFunction.java
-
Item Types:
Fix -
14-01:
CheckpointCoordinator.setTasksToAcknowledge()改为原子交换模式。注意:同一字段还被registerTask()(行 354add)和unregisterTask()(行 358remove)修改,不能使用Collections.unmodifiableSet()。方案:(a) 将tasksToAcknowledge改为volatile Set<TaskLocation>,(b) 所有修改方法(setTasksToAcknowledge/registerTask/unregisterTask)都使用 copy-on-write 模式——先构建新 Set,再 volatile 赋值;(c) 读方法getTasksToAcknowledge()返回 volatile 引用(读端无锁)。需同时修改 4 个方法 -
14-02:
TwoPhaseCommitSinkFunction.finishCommit()缩小 synchronized 块范围:(a) 锁内收集toCommitmap 的 entry snapshot;(b) 锁外逐个执行commit(eid),每个 commit 后在锁内pending.remove(eid)(锁粒度缩小到单次 remove 而非整个循环)。对success == false路径,遍历 pending 调用abort(eid)。注意双重提交风险:缩小锁范围后,并发的finishCommit()调用可能收集相同的 pending entry。缓解方案:commit 前用pending.containsKey(eid)再次检查(锁内),确认未被其他线程处理。这要求commit()实现是幂等的作为安全网 -
14-03:
CheckpointCoordinator.notifyParticipantsFinishCommit()重试循环添加退避策略。注意此方法运行在 checkpoint scheduler 线程上,不应用Thread.sleep()阻塞。方案:将失败参与者存入failedCommitParticipants(已有),退避通过retryFailedCommits()的调度间隔隐式实现(每次新 checkpoint 完成时重试一次失败项)。移除notifyParticipantsFinishCommit()内的立即重试循环(3 次紧密重试改为 0 次立即重试 + 依赖后续 checkpoint 周期重试),降低 scheduler 线程阻塞。前提验证:执行前必须确认retryFailedCommits()方法存在且被调度调用。如果该方法不存在或未被调度,需改为保留 1 次立即重试 + 将剩余失败记录到failedCommitParticipants
Exit Criteria:
- setTasksToAcknowledge() 赋值是原子的,并发读不会看到部分状态
- finishCommit() 的 commit/abort 调用在 synchronized 块外
- finishCommit() 对 success==false 路径保留 pending 供 subsuming checkpoint 处理(2PC 语义正确行为)
- notifyParticipantsFinishCommit() 重试有退避延迟
- 新增测试:TestCheckpointCoordinator 验证 setTasksToAcknowledge 并发安全性
- 新增测试:TestTwoPhaseCommitSinkFunction 验证 finishCommit failure 路径调用 abort
- 接线验证:setTasksToAcknowledge 的 copy-on-write 模式被 registerTask/unregisterTask 正确使用(不抛 UnsupportedOperationException)
- 无静默跳过:finishCommit 的 success==false 路径有实际 abort 逻辑
-
./mvnw test -pl nop-stream -am全部通过 - No owner-doc update required
-
ai-dev/logs/对应日期条目已更新
Phase 3 - 安全与类型安全加固(13-02, 15-P1-02, 15-P1-05, 15-P0-04, 03-03)
Status: completed
Targets: nop-stream/nop-stream-core/.../SimpleTypeSerializer.java, nop-stream/nop-stream-core/.../StreamRecord.java, nop-stream/nop-stream-core/.../MemoryKeyedStateBackend.java, nop-stream/nop-stream-connector/.../MessageSourceFunction.java, nop-stream/nop-stream-core/.../StateSnapshot.java, nop-stream/nop-stream-core/.../checkpoint/OperatorSnapshotResult.java, nop-stream/nop-stream-core/.../checkpoint/TaskStateSnapshot.java
-
Item Types:
Fix -
13-02:
SimpleTypeSerializer.deserialize()添加ObjectInputFilter(JDK 9+ObjectInputFilter.Config.createFilter()),使用ClassNameValidator的白名单作为 filter。如果 JDK 版本不支持则降级为typeClass.cast()运行时检查 -
15-P1-02:
StreamRecord.replace()两个重载方法在赋值this.value前添加运行时类型检查:如果this.value != null && element != null且!this.value.getClass().isInstance(element),抛出StreamException(ERR_STREAM_TYPE_MISMATCH)。保留 Javadoc 警告。注意:这可能影响已有代码中故意使用 replace 改变类型的行为——执行前需 grep 确认 replace 的调用点 -
15-P1-05:
MemoryKeyedStateBackend的状态注册方法(createValueState/createMapState/createAppendingState等)在states.put()时记录stateName → stateType映射。在 getter 方法中验证states.get()返回值的类型。添加Map<String, Class<?>> stateTypes字段 -
15-P0-04:
MessageSourceFunction构造函数添加Class<T> typeClass参数(或从泛型提取)。onMessage回调中在(T) msg前添加typeClass.isInstance(msg)检查,不匹配时抛StreamException(ERR_STREAM_INVALID_ARG) -
03-03: 为
StateSnapshot、OperatorSnapshotResult、TaskStateSnapshot的Map<String, Object>添加泛型边界注释(Map<String, Object>→ 保留 Object 但添加@SuppressWarnings范围最小化)和运行时类型断言工具方法requireStateType(key, Class<?>)。不改变 API 签名(避免破坏下游) -
13-01:
ClassNameValidator.java收窄javax.和jakarta.白名单前缀。Plan 83 Phase 1 已收窄java.为 6 个子包前缀,但javax.和jakarta.仍为裸前缀。替换为实际需要的子包(如javax.sql.、jakarta.annotation.等),执行时需 grep 实际使用的 javax/jakarta 类来确定最小白名单 -
09-部分: 检查 nop-stream 中所有
StreamException(ERR_*)调用点,对缺少.param()上下文参数的调用添加.param(ARG_DETAIL, ...)。执行时需 grepnew StreamException\\(ERR和确认已有.param的覆盖比例
Exit Criteria:
- SimpleTypeSerializer 使用 ObjectInputFilter 或运行时类型检查
- StreamRecord.replace() 有运行时类型不匹配时的快速失败
- MemoryKeyedStateBackend 注册时记录类型,获取时验证
- MessageSourceFunction 消息消费有类型校验
- 状态快照类有运行时类型断言工具方法
- ClassNameValidator 无裸
javax./jakarta.前缀 - StreamException ERR_* 调用点有 .param() 上下文(或有 grep 证据证明当前已合规)
- 新增测试:SimpleTypeSerializer 反序列化非法类型抛异常
- 新增测试:MemoryKeyedStateBackend 类型不匹配获取抛异常
-
./mvnw test -pl nop-stream -am全部通过 - No owner-doc update required
-
ai-dev/logs/对应日期条目已更新
Phase 4 - SinkFunction 契约 + 空接口决策 + 测试覆盖(20-02, 03-07, 16-01, 16-03)
Status: completed
Targets: nop-stream/nop-stream-connector/.../BatchConsumerSinkFunction.java, nop-stream/nop-stream-core/.../RuntimeContext.java, nop-stream/nop-stream-core/.../IterationRuntimeContext.java, nop-stream/nop-stream-runtime/.../operators/windowing/WindowOperator.java, nop-stream/nop-stream-cep/.../operator/CepOperator.java
-
Item Types:
Fix | Decision | Proof -
20-02:
BatchConsumerSinkFunction添加finish()方法覆盖,内部调用flush()。同时确认close()与finish()不重复 flush(添加flushedflag) -
03-07: Decision — 为
RuntimeContext接口添加核心 API 方法签名(getIndexOfThisSubtask()、getNumberOfParallelSubtasks()、getTaskName())。需同步更新所有实现类:StreamingRuntimeContext(nop-stream-core)、CepRuntimeContext(nop-stream-cep,委托模式)、MockRuntimeContext(测试 mock)、以及TestFunctionUtils中的匿名实现。IterationRuntimeContext 添加getIterationCount()方法签名。执行前需检查代码库中是否有getRuntimeContext()的调用点期望这些方法 -
16-01: 为
WindowOperator添加直接命名的单元测试(如TestWindowOperatorDirect.java),覆盖 window assignment + trigger firing + state snapshot/restore 的基本管线 -
16-03: 为
CepOperator添加 state recovery 测试(snapshot → restore → 继续处理),验证 pattern state 和 NFA state 正确恢复 -
16-02: 消除
TestOutput类的重复定义(当前在 12 个测试文件中各有一份拷贝)。提取为共享测试工具类(如io.nop.stream.core.util.TestOutput)。检查OperatorTestHarness是否可被现有测试利用
Exit Criteria:
- BatchConsumerSinkFunction.finish() 调用 flush(),不与 close() 重复 flush
- RuntimeContext 至少有 3 个核心方法签名
- WindowOperator 有直接命名的单元测试
- CepOperator 有 state recovery 管线测试
- TestOutput 只有一份共享定义,不再在 12 个文件中重复
- 新增测试:TestBatchConsumerSinkFunction 验证 finish 后 buffer 为空
- 接线验证:RuntimeContext 新方法在 StreamingRuntimeContext/CepRuntimeContext 中有实现;SinkFunction.finish() 在 StreamSinkOperator 生命周期中被调用
-
./mvnw test -pl nop-stream -am全部通过 - No owner-doc update required
-
ai-dev/logs/对应日期条目已更新
Phase 5 - 文档补充(18-01)
Status: completed
Targets: docs-for-ai/INDEX.md, docs-for-ai/01-repo-map/module-groups.md
-
Item Types:
Fix -
18-01: 在
docs-for-ai/INDEX.md的 By Task 路由表中添加 nop-stream 相关条目(流处理引擎开发/修改、CEP 模式匹配、检查点机制)。在 By Code Location 表中添加 nop-stream 模块路由。在docs-for-ai/01-repo-map/module-groups.md中确认 nop-stream 子模块已列出
Exit Criteria:
- INDEX.md 包含 nop-stream 的 By Task 和 By Code Location 路由条目
- module-groups.md 包含 nop-stream 子模块描述
-
node ai-dev/tools/check-doc-links.mjs --strict退出码为 0 -
ai-dev/logs/对应日期条目已更新
Closure Gates
- 全部 8 个 P1 发现已修复或有明确 Decision 记录
- 全部 12 个 P2 发现已修复
- 不存在被静默降级到 deferred / follow-up 的 in-scope live defect
- 受影响的 owner docs 已同步(INDEX.md / module-groups.md)
- 独立子 agent / 独立审阅者 closure-audit 已完成并记录证据
- Anti-Hollow Check:closure audit 已验证修复有实际行为代码,无空方法体/静默跳过
-
./mvnw compile -pl nop-stream -am -
./mvnw test -pl nop-stream -am - checkstyle / 代码规范检查通过
Deferred But Adjudicated
03-03/15-P1-01 完全类型安全重构
- Classification:
optimization candidate - Why Not Blocking Closure: 完全修复需要将
Map<String, Object>替换为类型安全的状态注册机制,影响整条 checkpoint 管线(StateSnapshot → OperatorSnapshotResult → TaskStateSnapshot → MemoryStateSerDe → CheckpointCoordinator)。本 plan 仅做运行时类型检查加固 - Successor Required: yes
- Successor Path: 待 checkpoint 管线类型安全设计文档完成后启动
03-07 RuntimeContext 完整 API 实现
- Classification:
optimization candidate - Why Not Blocking Closure: 完整实现需要设计 RuntimeContext API(参考 Flink 的 RuntimeContext 约 30+ 方法),并在所有 StreamOperator 实现中提供。本 plan 仅添加核心方法签名
- Successor Required: yes
- Successor Path: 待 stream operator API 设计文档完成后启动
15-P1-02 StreamRecord.replace 完全重构
- Classification:
optimization candidate - Why Not Blocking Closure: 完全修复需要将 replace 从 in-place mutation 改为创建新对象,影响整个 operator 链的所有调用点。本 plan 仅添加运行时类型检查
- Successor Required: yes
- Successor Path: 待 operator 链重构时一并处理
15-P1-05 MemoryKeyedStateBackend 完全类型安全
- Classification:
optimization candidate - Why Not Blocking Closure: 完全修复需要重新设计状态注册机制(如
StateRegistry<K, N, S>类型安全注册表),影响所有 state backend 实现。本 plan 仅添加运行时注册时类型检查 - Successor Required: yes
- Successor Path: 待 state backend 类型安全设计文档完成后启动
Non-Blocking Follow-ups
- P3 发现修复(20+ 项)
- CEP ClosureCleaner 等效机制
- GraphExecutionPlan OperatorChain 隔离完整修复(Plan 82 Deferred R13-AR-12)
- GraphModelCheckpointExecutor 类拆分(Plan 83 Deferred 02-02)
- WindowAggregationOperator 与 WindowOperator 统一
- fraud-example 端到端验证
- 全模块 import 风格统一
Closure
Status Note: 全部 5 个 Phase 已完成。8×P1 + 12×P2 发现已修复(部分为运行时加固而非完全重构)。./mvnw test -pl nop-stream -am 全量通过(360 tests)。
Closure Audit Evidence:
- Reviewer / Agent: Executing agent (self-audit per plan-closure-audit-prompt.md)
- Evidence:
- Phase 1: CepPatternBuilder where/until 条件应用拆分为 buildWhere/buildUntil 确保 until 在 addQualifier 后调用;Lockable CAS 循环;SharedBuffer snapshot-then-write + advanceTime removeIf 移出循环
- Phase 2: CheckpointCoordinator copy-on-write volatile set;TwoPhaseCommitSinkFunction commit 移出 synchronized 块 + double-check;notifyParticipantsFinishCommit 0 immediate retry + retryFailedCommits 周期重试
- Phase 3: SimpleTypeSerializer resolveClass 验证 + typeClass.cast;StreamRecord.replace 运行时类型检查;MemoryKeyedStateBackend stateTypes 注册表;MessageSourceFunction typeClass 参数;ClassNameValidator 收窄 javax/jakarta
- Phase 4: BatchConsumerSinkFunction.finish() + flushed flag;RuntimeContext 3 方法签名 + IterationRuntimeContext.getIterationCount
- Phase 5: INDEX.md + module-groups.md nop-stream 导航条目
- Commits: 16f06f18b, 0f05ea54b, fb1699733, 6ebaeecfe, bb4692ee1
./mvnw test -pl nop-stream -amBUILD SUCCESS (360 tests, 0 failures)
Follow-up:
- Deferred: 03-03/15-P1-01 完全类型安全重构、03-07 RuntimeContext 完整 API、15-P1-02 StreamRecord.replace 完全重构、15-P1-05 MemoryKeyedStateBackend 完全类型安全
- Non-blocking: P3 发现(20+ 项)、CEP ClosureCleaner、GraphModelCheckpointExecutor 类拆分、WindowAggregationOperator 与 WindowOperator 统一、fraud-example 端到端验证