"""flow Manager"""
import logging
import uuid
from sqlalchemy import and_, delete, select
from apps.common.postgres import postgres
from apps.models import (
App,
AppHashes,
NodeInfo,
Service,
UserFavorite,
UserFavoriteType,
)
from apps.models import Flow as FlowInfo
from apps.scheduler.pool.pool import pool
from apps.scheduler.slot.slot import Slot
from apps.schemas.enum_var import EdgeType
from apps.schemas.flow import Edge, Flow, FlowAppMetadata, PositionItem, Step
from apps.schemas.flow_topology import (
EdgeItem,
FlowItem,
NodeItem,
NodeMetaDataBase,
NodeMetaDataItem,
NodeServiceItem,
)
from .node import NodeManager
FLOW_SPLIT_LEN = 2
logger = logging.getLogger(__name__)
class FlowManager:
"""Flow相关操作"""
@staticmethod
async def get_flows_by_app_id(app_id: uuid.UUID) -> list[FlowInfo]:
"""
通过appId获取应用的所有flow
:param app_id: 应用的id
:return: flow列表
"""
async with postgres.session() as session:
return list((await session.scalars(
select(FlowInfo).where(FlowInfo.appId == app_id),
)).all())
@staticmethod
async def get_node_id_by_service_id(service_id: uuid.UUID) -> list[NodeMetaDataBase] | None:
"""
根据serviceId获取service内节点的基础信息,用于左侧节点列表展示
:param service_id: 服务id
:return: 节点基础信息的列表,按创建时间排序
"""
async with postgres.session() as session:
node_data = list((await session.scalars(
select(NodeInfo.id, NodeInfo.callId, NodeInfo.name, NodeInfo.updatedAt).where(
NodeInfo.serviceId == service_id,
).order_by(NodeInfo.updatedAt.desc()),
)).all())
result = []
for item in node_data:
result += [NodeMetaDataBase(
nodeId=item.id,
callId=item.callId,
name=item.name,
updatedAt=round(item.updatedAt.timestamp(), 3),
)]
return result
@staticmethod
async def get_service_by_user(user_id: str) -> list[NodeServiceItem] | None:
"""
通过user_id获取用户自己上传的或收藏的Service,用于插件中心展示
:param user_id: str: 用户的唯一标识符
:return: service的列表
"""
async with postgres.session() as session:
service_ids = []
user_favs = list((await session.scalars(
select(UserFavorite.itemId).where(
and_(
UserFavorite.userId == user_id,
UserFavorite.favouriteType == UserFavoriteType.SERVICE,
),
),
)).all())
service_ids = service_ids + user_favs
user_services = list((await session.scalars(
select(Service.id).where(
Service.authorId == user_id,
),
)).all())
service_ids = service_ids + user_services
service_ids = list(set(service_ids))
service_data = list((await session.scalars(
select(Service).where(
Service.id.in_(service_ids),
).order_by(Service.updatedAt.desc()),
)).all())
service_items = [NodeServiceItem(
serviceId=uuid.UUID("00000000-0000-0000-0000-000000000000"),
name="系统",
data=[],
createdAt=None,
)]
service_items += [
NodeServiceItem(
serviceId=item.id,
name=item.name,
data=[],
createdAt=str(item.updatedAt.timestamp()),
)
for item in service_data
]
for service_item in service_items:
node_meta_datas = await FlowManager.get_node_id_by_service_id(service_item.service_id)
if node_meta_datas is None:
node_meta_datas = []
service_item.data = node_meta_datas
return service_items
@staticmethod
async def get_node_by_node_id(node_id: str) -> NodeMetaDataItem | None:
"""
通过node_id获取对应的节点元数据信息
:param node_id: node的id
:return: node_id对应的节点元数据信息
"""
async with postgres.session() as session:
node_data = (await session.scalars(
select(NodeInfo).where(NodeInfo.id == node_id),
)).one_or_none()
if node_data is None:
err = f"[FlowManager] 节点元数据 {node_id} 不存在"
logger.error(err)
raise ValueError(err)
params_schema, output_schema = await NodeManager.get_node_params(node_data.id)
parameters = {
"input_parameters": Slot(params_schema).create_empty_slot(),
"output_parameters": Slot(output_schema).extract_type_desc_from_schema(),
}
return NodeMetaDataItem(
nodeId=node_data.id,
callId=node_data.callId,
name=node_data.name,
description=node_data.description,
parameters=parameters,
updatedAt=round(node_data.updatedAt.timestamp(), 3),
)
@staticmethod
async def get_flow_by_app_and_flow_id(app_id: uuid.UUID, flow_id: str) -> FlowItem | None:
"""
通过appId flowId获取flow config的路径和focus,并通过flow config的路径获取flow config,并将其转换为flow item。
:param app_id: 应用的id
:param flow_id: 流的id
:return: 流的item和用户在这个流上的视觉焦点
"""
async with postgres.session() as session:
flow_data = (await session.scalars(
select(FlowInfo).where(and_(FlowInfo.appId == app_id, FlowInfo.id == flow_id)),
)).one_or_none()
if flow_data is None:
err = f"[FlowManager] 流 {flow_id} 不存在"
logger.error(err)
raise ValueError(err)
flow_config = await pool.flow_loader.load(app_id, flow_id)
flow_item = FlowItem(
flowId=flow_id,
name=flow_config.name,
description=flow_config.description,
enable=True,
nodes=[],
edges=[],
checkStatus=flow_config.checkStatus,
basicConfig=flow_config.basicConfig,
)
for node_id, node_config in flow_config.steps.items():
input_parameters = node_config.params
_, output_parameters = await NodeManager.get_node_params(node_config.node)
parameters = {
"input_parameters": input_parameters,
"output_parameters": Slot(output_parameters).extract_type_desc_from_schema(),
}
node_item = NodeItem(
stepId=node_id,
nodeId=node_config.node,
name=node_config.name,
description=node_config.description,
callId=node_config.type,
parameters=parameters,
position=PositionItem(x=node_config.pos.x, y=node_config.pos.y),
)
flow_item.nodes.append(node_item)
for edge_config in flow_config.edges:
edge_from = edge_config.edge_from
branch_id = ""
tmp_list = edge_config.edge_from
if len(tmp_list) == 0 or len(tmp_list) > FLOW_SPLIT_LEN:
logger.error("[FlowManager] Flow中边的格式错误")
continue
if len(tmp_list) == FLOW_SPLIT_LEN:
edge_from = tmp_list[0]
branch_id = tmp_list[1]
flow_item.edges.append(
EdgeItem(
edgeId=edge_config.id,
sourceNode=edge_from,
targetNode=edge_config.edge_to,
type=edge_config.edge_type.value if edge_config.edge_type else EdgeType.NORMAL.value,
branchId=branch_id,
),
)
return flow_item
@staticmethod
async def is_flow_config_equal(flow_config_1: Flow, flow_config_2: Flow) -> bool:
"""
比较两个流配置是否相等
:param flow_config_1: 流配置1
:param flow_config_2: 流配置2
:return: 如果相等则返回True,否则返回False
"""
if len(flow_config_1.steps) != len(flow_config_2.steps):
return False
if len(flow_config_1.edges) != len(flow_config_2.edges):
return False
step_list_1 = []
for step in flow_config_1.steps.values():
step_tuple = (
step.node,
step.type,
step.description,
tuple(sorted((k, str(v)) for k, v in step.params.items())),
)
step_list_1.append(step_tuple)
step_list_2 = []
for step in flow_config_2.steps.values():
step_tuple = (
step.node,
step.type,
step.description,
tuple(sorted((k, str(v)) for k, v in step.params.items())),
)
step_list_2.append(step_tuple)
if sorted(step_list_1) != sorted(step_list_2):
return False
edge_list_1 = [(edge.edge_from, edge.edge_to) for edge in flow_config_1.edges]
edge_list_2 = [(edge.edge_from, edge.edge_to) for edge in flow_config_2.edges]
return sorted(edge_list_1) == sorted(edge_list_2)
@staticmethod
async def put_flow_by_app_and_flow_id(
app_id: uuid.UUID, flow_id: str, flow_item: FlowItem,
) -> None:
"""
存储/更新flow的数据库数据和配置文件
:param app_id: 应用的id
:param flow_id: 流的id
:param flow_item: 流的item
"""
async with postgres.session() as session:
app_record = (await session.scalars(
select(App.id).where(
App.id == app_id,
),
)).one_or_none()
if app_record is None:
err = f"[FlowManager] 应用 {app_id} 不存在"
logger.error(err)
raise ValueError(err)
if not flow_item.basic_config:
err = "[FlowManager] basic_config is required"
logger.error(err)
raise ValueError(err)
flow_config = Flow(
name=flow_item.name,
description=flow_item.description,
checkStatus=flow_item.check_status,
basicConfig=flow_item.basic_config,
steps={},
edges=[],
)
for node_item in flow_item.nodes:
flow_config.steps[node_item.step_id] = Step(
type=node_item.call_id,
node=node_item.node_id,
name=node_item.name,
description=node_item.description,
pos=node_item.position,
params=node_item.parameters.get("input_parameters", {}),
)
for edge_item in flow_item.edges:
edge_from = edge_item.source_branch
if edge_item.branch_id:
edge_from = edge_from + "." + edge_item.branch_id
edge_config = Edge(
id=edge_item.edge_id,
edge_from=edge_from,
edge_to=edge_item.target_branch,
edge_type=EdgeType(edge_item.type) if edge_item.type else EdgeType.NORMAL,
)
flow_config.edges.append(edge_config)
old_flow_config = await pool.flow_loader.load(app_id, flow_id)
if old_flow_config and old_flow_config.checkStatus.debug:
flow_config.checkStatus.debug = await FlowManager.is_flow_config_equal(old_flow_config, flow_config)
await pool.flow_loader.save(app_id, flow_id, flow_config)
@staticmethod
async def delete_flow_by_app_and_flow_id(app_id: uuid.UUID, flow_id: str) -> None:
"""
删除flow的数据库数据和配置文件
:param app_id: 应用的id
:param flow_id: 流的id
"""
await pool.flow_loader.delete(app_id, flow_id)
async with postgres.session() as session:
key = f"flow/{flow_id}.yaml"
await session.execute(
delete(AppHashes).where(
and_(
AppHashes.appId == app_id,
AppHashes.filePath == key,
),
),
)
metadata = await pool.app_loader.read_metadata(app_id)
if not isinstance(metadata, FlowAppMetadata):
err = f"[FlowManager] 应用 {app_id} 不是Flow应用"
logger.error(err)
raise TypeError(err)
if metadata.hashes and key in metadata.hashes:
del metadata.hashes[key]
metadata.flows = [flow for flow in metadata.flows if flow.id != flow_id]
await pool.app_loader.save(metadata, app_id)
@staticmethod
async def update_flow_debug_by_app_and_flow_id(app_id: uuid.UUID, flow_id: str, *, debug: bool) -> bool:
"""
更新flow的debug状态
:param app_id: 应用的id
:param flow_id: 流的id
:param debug: 是否开启debug
:return: 是否更新成功
"""
flow = await pool.flow_loader.load(app_id, flow_id)
if flow is None:
return False
flow.checkStatus.debug = debug
await pool.flow_loader.save(app_id=app_id, flow_id=flow_id, flow=flow)
return True