FlowManager 模块设计文档
1. 概述
FlowManager 模块是 openEuler Intelligence 框架中负责工作流(Flow)管理的核心模块。当前实现重点覆盖已有 Flow 的读取、更新、删除,以及节点元数据管理、服务管理等功能(独立的“新建 Flow”流程尚未单独提供专用接口)。
1.1 核心组件
- FlowManager: 工作流管理的主要服务类,提供工作流和节点的 CRUD 操作
- FlowServiceManager: 工作流拓扑验证和处理服务
- FlowLoader: 工作流配置文件的加载和保存
- Flow API Router: FastAPI 路由层,提供 RESTful API 接口
2. 数据模型
2.1 核心数据结构
classDiagram
class Flow {
+str name
+str description
+FlowCheckStatus checkStatus
+FlowBasicConfig basicConfig
+FlowError onError
+dict~UUID,Step~ steps
+list~Edge~ edges
}
class Step {
+str node
+str type
+str name
+str description
+PositionItem pos
+dict params
}
class Edge {
+UUID id
+str edge_from
+str edge_to
+EdgeType edge_type
}
class FlowBasicConfig {
+UUID startStep
+UUID endStep
+PositionItem focusPoint
}
class FlowCheckStatus {
+bool debug
+bool connectivity
}
class FlowItem {
+str flow_id
+str name
+str description
+bool enable
+list~NodeItem~ nodes
+list~EdgeItem~ edges
+FlowBasicConfig basic_config
+FlowCheckStatus check_status
}
Flow --> Step
Flow --> Edge
Flow --> FlowBasicConfig
Flow --> FlowCheckStatus
FlowItem --> NodeItem
FlowItem --> EdgeItem
2.2 数据库模型
erDiagram
App ||--o{ Flow : contains
Flow {
string id PK
uuid appId FK
string name
text description
string path
bool debug
bool enabled
datetime updatedAt
}
App {
uuid id PK
string name
string author
}
NodeInfo {
string id PK
uuid serviceId FK
string callId
string name
text description
datetime updatedAt
}
Service {
uuid id PK
string name
string author
datetime updatedAt
}
UserFavorite {
string userId
uuid itemId
string favouriteType
}
AppHashes {
uuid appId FK
string filePath
string hash
}
Service ||--o{ NodeInfo : contains
App ||--o{ AppHashes : tracks
3. 核心功能
3.1 工作流管理流程
sequenceDiagram
participant Client
participant API as Flow Router
participant FM as FlowManager
participant FSM as FlowServiceManager
participant FL as FlowLoader
participant DB as PostgreSQL
participant FS as FileSystem
Note over Client,FS: 获取工作流
Client->>API: GET /api/flow?appId=xxx&flowId=xxx
API->>FM: get_flow_by_app_and_flow_id()
FM->>DB: 查询 Flow 记录
FM->>FL: load(app_id, flow_id)
FL->>FS: 读取 YAML 文件
FL->>FL: 验证基本字段
FL->>FL: 处理边(edges)
FL->>FL: 处理步骤(steps)
FL->>DB: 更新数据库记录
FL-->>FM: 返回 Flow 配置
FM->>FM: 转换为 FlowItem
FM-->>API: 返回 FlowItem
API-->>Client: 返回响应
Note over Client,FS: 更新工作流
Client->>API: PUT /api/flow?appId=xxx&flowId=xxx
API->>FSM: remove_excess_structure_from_flow()
FSM-->>API: 清理后的 FlowItem
API->>FSM: validate_flow_illegal()
FSM->>FSM: 验证节点ID唯一性
FSM->>FSM: 验证边的合法性
FSM-->>API: 验证通过
API->>FSM: validate_flow_connectivity()
FSM->>FSM: BFS 检查连通性
FSM-->>API: 返回连通性状态
API->>FM: put_flow_by_app_and_flow_id()
FM->>FM: 转换 FlowItem 为 Flow
FM->>FL: 检查旧配置
FM->>FM: is_flow_config_equal()
FM->>FL: save(app_id, flow_id, flow)
FL->>FS: 写入 YAML 文件
FL->>DB: 以 appId 为范围删除旧 Flow 记录并写入最新元数据
FL->>DB: 更新 AppHashes
FL-->>FM: 保存成功
FM-->>API: 更新完成
API-->>Client: 返回更新结果
Note: 上述流程不会自动触发向量索引同步,如需同步需在业务层额外调用 _update_vector 或传入 embedding 模型。
3.2 节点和服务管理流程
sequenceDiagram
participant Client
participant API as Flow Router
participant FM as FlowManager
participant DB as PostgreSQL
participant NM as NodeManager
Note over Client,NM: 获取用户服务列表
Client->>API: GET /api/flow/service
API->>FM: get_service_by_user_id(user_id)
FM->>DB: 查询用户收藏的服务
FM->>DB: 查询用户上传的服务
FM->>DB: 去重并查询服务详情
loop 每个服务
FM->>FM: get_node_id_by_service_id()
FM->>DB: 查询服务下的节点
FM->>FM: 格式化节点元数据
end
FM-->>API: 返回服务及节点列表
API-->>Client: 返回响应
Note over Client,NM: 获取节点详细信息
Client->>FM: get_node_by_node_id(node_id)
FM->>DB: 查询节点记录
FM->>NM: get_node_params(node_id)
NM-->>FM: 返回参数 schema
FM->>FM: 创建空槽位(empty slot)
FM->>FM: 提取类型描述
FM-->>Client: 返回节点元数据
3.3 工作流验证流程
flowchart TD
Start([开始验证]) --> ValidateNodes[验证节点ID唯一性]
ValidateNodes --> CheckStart{检查起始节点}
CheckStart -->|存在| CheckEnd{检查终止节点}
CheckStart -->|不存在| Error1[抛出异常: 起始节点不存在]
CheckEnd -->|存在| ValidateEdges[验证边的合法性]
CheckEnd -->|不存在| Error2[抛出异常: 终止节点不存在]
ValidateEdges --> CheckEdgeId{边ID唯一?}
CheckEdgeId -->|否| Error3[抛出异常: 边ID重复]
CheckEdgeId -->|是| CheckSelfLoop{自环?}
CheckSelfLoop -->|是| Error4[抛出异常: 起止节点相同]
CheckSelfLoop -->|否| CheckBranch{分支合法?}
CheckBranch -->|否| Error5[抛出异常: 分支非法]
CheckBranch -->|是| ValidateConn[验证连通性]
ValidateConn --> BFS[BFS遍历图]
BFS --> CheckReachable{所有节点可达?}
CheckReachable -->|否| ConnFalse[连通性=False]
CheckReachable -->|是| CheckEndReachable{终止节点可达?}
CheckEndReachable -->|否| ConnFalse
CheckEndReachable -->|是| CheckOutEdge{非终止节点都有出边?}
CheckOutEdge -->|否| ConnFalse
CheckOutEdge -->|是| ConnTrue[连通性=True]
ConnTrue --> Success([验证成功])
ConnFalse --> Success
Error1 --> End([结束])
Error2 --> End
Error3 --> End
Error4 --> End
Error5 --> End
Success --> End
3.4 工作流删除流程
sequenceDiagram
participant Client
participant API as Flow Router
participant FM as FlowManager
participant FL as FlowLoader
participant DB as PostgreSQL
participant FS as FileSystem
participant AL as AppLoader
Client->>API: DELETE /api/flow?appId=xxx&flowId=xxx
API->>API: 验证用户权限
API->>FM: delete_flow_by_app_and_flow_id()
FM->>FL: delete(app_id, flow_id)
FL->>FS: 检查文件是否存在
FL->>FS: 删除 YAML 文件
FL->>DB: 删除 Flow 记录
FL->>DB: 删除向量数据(仅当调用方向 FlowLoader.delete 传入 embedding_model 时)
FL->>DB: commit
FL-->>FM: 删除成功
FM->>DB: 删除 AppHashes 记录
FM->>AL: read_metadata(app_id)
AL-->>FM: 返回 FlowAppMetadata
FM->>FM: 从 hashes 中移除 flow
FM->>FM: 从 flows 列表中移除
FM->>AL: save(metadata, app_id)
AL->>FS: 更新 metadata.yaml
AL->>DB: 更新数据库
AL-->>FM: 保存成功
FM-->>API: 删除完成
API-->>Client: 返回删除结果
4. API 接口
4.1 接口列表
| 方法 | 路径 | 功能 | 认证 |
|---|---|---|---|
| GET | /api/flow/service |
获取用户可访问的服务及节点列表 | 需要 |
| GET | /api/flow |
获取指定工作流的拓扑结构 | 需要 |
| PUT | /api/flow |
更新工作流拓扑结构 | 需要 |
| DELETE | /api/flow |
删除工作流 | 需要 |
4.2 接口详情
4.2.1 获取服务列表
HTTP 请求:
GET /api/flow/service
Authorization: Bearer <token>
返回示例:
{
"code": 200,
"message": "Node所在Service获取成功",
"result": {
"services": [
{
"serviceId": "00000000-0000-0000-0000-000000000000",
"name": "系统",
"data": [
{
"nodeId": "start",
"callId": "Start",
"name": "开始",
"updatedAt": 1234567890.123
}
],
"createdAt": null
}
]
}
}
4.2.2 获取工作流
HTTP 请求:
GET /api/flow?appId={uuid}&flowId={string}
Authorization: Bearer <token>
返回示例:
{
"code": 200,
"message": "应用的Workflow获取成功",
"result": {
"flow": {
"flowId": "main",
"name": "主工作流",
"description": "这是一个示例工作流",
"enable": true,
"nodes": [...],
"edges": [...],
"basicConfig": {
"startStep": "uuid",
"endStep": "uuid",
"focusPoint": {"x": 0, "y": 0}
},
"checkStatus": {
"debug": false,
"connectivity": true
}
}
}
}
4.2.3 更新工作流
HTTP 请求:
PUT /api/flow?appId={uuid}&flowId={string}
Authorization: Bearer <token>
Content-Type: application/json
{
"flow": {
"flowId": "main",
"name": "主工作流",
"nodes": [...],
"edges": [...],
"basicConfig": {...}
}
}
返回示例:
{
"code": 200,
"message": "应用下流更新成功",
"result": {
"flow": {...}
}
}
4.2.4 删除工作流
HTTP 请求:
DELETE /api/flow?appId={uuid}&flowId={string}
Authorization: Bearer <token>
返回示例:
{
"code": 200,
"message": "应用下流程删除成功",
"result": {
"flowId": "main"
}
}
5. 核心类详解
5.1 FlowManager
位置:apps/services/flow.py
主要方法:
| 方法 | 功能 | 参数 | 返回值 |
|---|---|---|---|
get_flows_by_app_id |
获取应用的所有工作流 | app_id: UUID |
list[FlowInfo] |
get_node_id_by_service_id |
获取服务下的节点列表 | service_id: UUID |
list[NodeMetaDataBase] |
get_service_by_user_id |
获取用户的服务列表 | user_id: str |
list[NodeServiceItem] |
get_node_by_node_id |
获取节点详细信息 | node_id: str |
NodeMetaDataItem |
get_flow_by_app_and_flow_id |
获取工作流详情 | app_id: UUID, flow_id: str |
FlowItem |
put_flow_by_app_and_flow_id |
保存/更新工作流 | app_id: UUID, flow_id: str, flow_item: FlowItem |
None |
delete_flow_by_app_and_flow_id |
删除工作流 | app_id: UUID, flow_id: str |
None |
update_flow_debug_by_app_and_flow_id |
更新调试状态 | app_id: UUID, flow_id: str, debug: bool |
bool |
is_flow_config_equal |
比较两个工作流配置是否相等 | flow_config_1: Flow, flow_config_2: Flow |
bool |
5.2 FlowServiceManager
位置:apps/services/flow_service.py
主要方法:
| 方法 | 功能 | 异常 |
|---|---|---|
remove_excess_structure_from_flow |
移除工作流中的多余结构(无效边) | FlowBranchValidationError |
validate_flow_illegal |
验证工作流是否合法 | FlowNodeValidationError, FlowEdgeValidationError |
validate_flow_connectivity |
验证工作流连通性(BFS) | - |
_validate_node_ids |
验证节点ID唯一性 | FlowNodeValidationError |
_validate_edges |
验证边的合法性 | FlowEdgeValidationError |
5.3 FlowLoader
位置:apps/scheduler/pool/loader/flow.py
主要方法:
| 方法 | 功能 | 异常 |
|---|---|---|
load |
从文件系统加载工作流,并刷新数据库中的基本元数据 | FileNotFoundError, RuntimeError |
save |
保存工作流到文件系统,并在数据库中以“先删后插”方式写入最新元数据 | - |
delete |
删除工作流文件和数据库记录(不传 embedding_model 时不会触发向量清理) | - |
_load_yaml_file |
加载YAML文件 | - |
_validate_basic_fields |
验证基本字段 | - |
_process_edges |
将 YAML 的 from/to/type 转换为内部字段,分支通过 stepId.branchId 字符串表示 |
- |
_process_steps |
处理步骤的类型和文档信息 | ValueError |
_update_db |
同步 FlowInfo 和 AppHashes(覆盖同一 appId 的旧记录) | - |
_update_vector |
预留的向量更新方法(当前调用路径未触发) | - |
实现提示
- FlowLoader 的
save/load都会调用_update_db,该方法会删除同一appId下的旧FlowInfo记录后再写入新记录,调用侧需要在保存多个 Flow 时显式传入完整集合。- 向量数据的增量更新需要外部显式调用
_update_vector或在删除时传入 embedding 模型;默认调用路径不会同步向量索引。
6. 状态管理
6.1 工作流状态
stateDiagram-v2
[*] --> Created: 创建工作流
Created --> Editing: 编辑中
Editing --> Validating: 保存
Validating --> Invalid: 验证失败
Validating --> Valid: 验证成功
Invalid --> Editing: 继续编辑
Valid --> Testing: 开始调试
Testing --> Debugged: 调试成功
Testing --> Valid: 调试失败
Debugged --> Published: 发布
Published --> [*]: 删除
Editing --> [*]: 删除
Valid --> [*]: 删除
note right of Valid
connectivity = true
debug = false
end note
note right of Debugged
connectivity = true
debug = true
end note
6.2 工作流检查状态
- debug:是否经过调试(当工作流内容修改后会重置为 false)
- connectivity:图的连通性检查
- 起始节点到终止节点是否联通
- 除终止节点外所有节点是否都有出边
- 所有节点是否都可从起始节点到达
7. 数据转换
7.1 FlowItem 与 Flow 转换
flowchart LR
subgraph Frontend
FlowItem[FlowItem<br/>前端数据结构]
end
subgraph Backend
Flow[Flow<br/>配置数据结构]
end
subgraph Storage
YAML[YAML文件]
DB[(数据库)]
end
FlowItem -->|put_flow| Flow
Flow -->|get_flow| FlowItem
Flow -->|save| YAML
YAML -->|load| Flow
Flow -->|_update_db| DB
DB -->|query| Flow
转换关键点:
-
NodeItem → Step
step_id作为字典 keynode_id映射到nodecall_id映射到typeparameters["input_parameters"]映射到params
-
EdgeItem → Edge
source_branch+branch_id组合成edge_from(用 "." 连接)target_branch映射到edge_totype转换为EdgeType枚举
-
边格式处理
- 存储:
edge_from = "step_id.branch_id"(有分支时) - 存储:
edge_from = "step_id"(无分支时) - 解析:按 "." 分割,长度为 2 表示有分支
- 存储:
8. 异常处理
8.1 异常类型
| 异常类 | 触发条件 | 处理方式 |
|---|---|---|
FlowNodeValidationError |
节点ID重复、起始/终止节点不存在 | 返回400错误 |
FlowEdgeValidationError |
边ID重复、自环、分支非法 | 返回400错误 |
FlowBranchValidationError |
分支字段缺失/为空、分支重复、非法字符 | 返回400错误 |
ValueError |
应用不存在、工作流不存在、配置错误 | 返回404/500错误 |
FileNotFoundError |
工作流文件不存在 | 返回404错误 |
RuntimeError |
YAML文件格式错误 | 返回500错误 |
8.2 错误处理流程
flowchart TD
Request[API请求] --> Auth{权限验证}
Auth -->|失败| Return403[返回403 Forbidden]
Auth -->|成功| Validate{数据验证}
Validate -->|失败| Return400[返回400 Bad Request]
Validate -->|成功| Process[处理业务逻辑]
Process -->|异常| CatchError{捕获异常}
CatchError -->|FlowValidationError| Return400
CatchError -->|FileNotFoundError| Return404[返回404 Not Found]
CatchError -->|ValueError| Return404
CatchError -->|其他异常| Return500[返回500 Internal Server Error]
Process -->|成功| Return200[返回200 OK]
9. 安全性
- 输入参数使用 Pydantic 模型验证
- 分支ID禁止包含"."等非法字符
- 节点/边ID唯一性检查
- YAML文件格式验证
10. 配置示例
YAML配置文件示例:
name: 示例工作流
description: 这是一个示例工作流配置
checkStatus:
debug: false
connectivity: true
basicConfig:
startStep: a1b2c3d4-e5f6-7890-abcd-ef1234567890
endStep: f1e2d3c4-b5a6-7890-dcba-fe0987654321
focusPoint:
x: 0.0
y: 0.0
onError:
use_llm: true
steps:
a1b2c3d4-e5f6-7890-abcd-ef1234567890:
type: Start
node: start
name: 开始
description: 工作流起始节点
pos:
x: 100.0
y: 100.0
params: {}
b2c3d4e5-f6a7-8901-bcde-f12345678901:
type: ApiCall
node: api_node_123
name: API调用
description: 调用外部API
pos:
x: 300.0
y: 100.0
params:
url: https://api.example.com
method: GET
f1e2d3c4-b5a6-7890-dcba-fe0987654321:
type: End
node: end
name: 结束
description: 工作流结束节点
pos:
x: 500.0
y: 100.0
params: {}
edges:
- id: edge001
edge_from: a1b2c3d4-e5f6-7890-abcd-ef1234567890
edge_to: b2c3d4e5-f6a7-8901-bcde-f12345678901
edge_type: NORMAL
- id: edge002
edge_from: b2c3d4e5-f6a7-8901-bcde-f12345678901
edge_to: f1e2d3c4-b5a6-7890-dcba-fe0987654321
edge_type: NORMAL
11. 文件存储结构
data_dir/
└── semantics/
└── app/
└── {app_id}/
├── metadata.yaml # 应用元数据
└── flow/
├── main.yaml # 主工作流
├── {flow_id}.yaml # 其他工作流
└── ...