import json
import uuid
import time
import random
from typing import Any, Callable
from minio import Minio
from fastapi import status
from openjiuwen.core.common.logging import logger
from pydantic import ValidationError
from openjiuwen_studio.core.manager.internal.workflow import WorkflowResponseUpdate, WorkflowResponsePublish
from openjiuwen_studio.core.manager.login_manager.space import check_user_space
from openjiuwen_studio.core.manager.utils.utils import Version, check_version
from openjiuwen_studio.core.utils.exception import log_exception
from openjiuwen_studio.core.config import settings
from openjiuwen_studio.models.workflow import WorkflowBaseDBPd, WorkflowPublishDBPd
from openjiuwen_studio.models.agent import AgentBaseDB, AgentPublishDB
from openjiuwen_studio.schemas.workflow import WorkflowBase, WorkflowSave, WorkflowResponseSave, \
WorkflowList, WorkflowResponse, WorkflowResponseList, WorkflowPublish, \
WorkflowBaseResponse, WorkflowUpdate, WorkflowId, WorkflowSearchRequest, WorkflowCreate, \
WorkflowSearchResponse, WorkflowVersionListRequest, WorkflowVersionInfo, WorkflowVersionListResponse
import openjiuwen_studio.core.manager.convertor.workflow as convert
from openjiuwen_studio.schemas.common import ResponseModel
from openjiuwen_studio.core.manager.repositories.workflow_repository import workflow_repository
from openjiuwen_studio.core.manager.repositories.jiuwen_base_repository import get_db_jw, JiuwenBaseRepository
from openjiuwen_studio.core.common.dsl import ComponentType
from openjiuwen_studio.core.manager.workflow_tag import create_workflow_tags, get_workflow_tags, update_workflow_tags
from openjiuwen_studio.core.database import milliseconds
from openjiuwen_studio.schemas.space import SpaceAWPQuery
from openjiuwen_studio.core.manager.reference_extractor import extract_workflow_references, \
check_referenced_dependencies
from openjiuwen_studio.core.manager.repositories.reference_repository import reference_repository
from openjiuwen_studio.core.manager.repositories.prompt_relation_repository import prompt_relation_repository
from openjiuwen_studio.core.common.exceptions import BaseError
from openjiuwen_studio.core.common.exceptions import JiuWenComponentException
from openjiuwen_studio.core.manager.model_manager.managers.model_config_manager import ModelConfigManager
from openjiuwen_studio.core.database import SessionLocal
from openjiuwen_studio.schemas.model_config import ModelParameters
from openjiuwen_studio.core.executor.workflow.workflow import Workflow as ExecutableWorkflow
random_id = ''.join(random.choice('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_') for _ in range(5))
DEFAULT_WORKFLOW_TEXTS_ZH = {
"start_title": "开始",
"end_title": "结束",
"query_default": "你好,请帮我分析一下这个问题。"
}
DEFAULT_WORKFLOW_TEXTS_EN = {
"start_title": "Start",
"end_title": "End",
"query_default": "Hello, please help me analyze this question."
}
def get_default_workflow_schema():
"""获取默认工作流schema,根据当前语言返回对应文本"""
from openjiuwen_studio.core.common.language_thread_context import get_language
language = get_language()
if language in ("zh-cn", "zh"):
texts = DEFAULT_WORKFLOW_TEXTS_ZH
else:
texts = DEFAULT_WORKFLOW_TEXTS_EN
return {
"nodes": [
{
"id": f"start_{random_id}",
"type": "1",
"meta": {
"position": {
"x": 180,
"y": 36
}
},
"data": {
"title": texts["start_title"],
"outputs": {
"type": "object",
"properties": {
"query": {
"type": "string",
"default": texts["query_default"]
}
}
}
}
},
{
"id": f"end_{random_id}",
"type": "2",
"meta": {
"position": {
"x": 1100,
"y": 36
}
},
"data": {
"title": texts["end_title"],
"inputs": {
"inputParameters": {
"result": {}
}
},
"streaming": False
}
}
],
"edges": []
}
def with_exception_handling(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ValidationError as e:
log_exception(e)
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=str(e)
)
except BaseError as e:
log_exception(e)
if isinstance(e, JiuWenComponentException):
type_name_map = {
ComponentType.COMPONENT_TYPE_START: "START",
ComponentType.COMPONENT_TYPE_LLM: "LLM",
ComponentType.COMPONENT_TYPE_END: "END",
ComponentType.COMPONENT_TYPE_IF: "IF",
ComponentType.COMPONENT_TYPE_LOOP: "LOOP",
ComponentType.COMPONENT_TYPE_INPUT: "INPUT",
ComponentType.COMPONENT_TYPE_OUTPUT: "OUTPUT",
ComponentType.COMPONENT_TYPE_QUESTION: "QUESTION",
ComponentType.COMPONENT_TYPE_CONTINUE: "CONTINUE",
ComponentType.COMPONENT_TYPE_BREAK: "BREAK",
ComponentType.COMPONENT_TYPE_TEXT_EDITOR: "TEXT_EDITOR",
ComponentType.COMPONENT_TYPE_INTENT: "INTENT",
ComponentType.COMPONENT_TYPE_SUB_WORKFLOW: "SUB_WORKFLOW",
ComponentType.COMPONENT_TYPE_EMPTY_START: "EMPTY_START",
ComponentType.COMPONENT_TYPE_EMPTY_END: "EMPTY_END",
ComponentType.COMPONENT_TYPE_CODE: "CODE",
ComponentType.COMPONENT_TYPE_VARIABLE_MERGE: "VARIABLE_MERGE",
ComponentType.COMPONENT_TYPE_SET_VARIABLE: "SET_VARIABLE",
ComponentType.COMPONENT_TYPE_PLUGIN: "PLUGIN",
}
type_name = type_name_map.get(getattr(e, "component_type", 0), str(getattr(e, "component_type", "")))
formatted_message = f"{type_name} component [{getattr(e, 'component_id', '')}]: {e.message}"
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=formatted_message,
data={
"error_code": getattr(e, "code", -1),
"component_id": e.component_id,
"component_type": e.component_type,
"error_stage": e.error_stage,
}
)
else:
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=e.message,
data={"error_code": getattr(e, "code", -1)}
)
except Exception as e:
log_exception(e)
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=str(e)
)
return wrapper
def _process_workflow_data_list(data_list: list) -> list[WorkflowResponse]:
"""
处理工作流数据列表的通用函数
获取标签并构建 WorkflowResponse 对象
Args:
data_list: 从数据库获取的工作流数据列表
Returns:
处理后的 WorkflowResponse 对象列表
"""
res_list: list[WorkflowResponse] = []
for d in data_list:
if not isinstance(d, dict):
logger.warning(f"Invalid workflow data item: {d}, skipping")
continue
logger.debug(f"Processing workflow: {d.get('workflow_id')}")
if not d.get("workflow_id") or not d.get("space_id"):
logger.warning(f"Missing required fields in workflow data: {d}")
continue
workflow_version = d.get("workflow_version") or "draft"
try:
workflow_tags_result = get_workflow_tags(d.get("workflow_id"), d.get("space_id"), workflow_version)
workflow_tags = workflow_tags_result if workflow_tags_result else []
except Exception as e:
logger.error(f"Error getting tags for workflow {d.get('workflow_id')}: {str(e)}")
workflow_tags = []
workflow_id = d.get("workflow_id")
name = d.get("name", "Unnamed Workflow")
desc = d.get("desc", "")
space_id = d.get("space_id")
create_time = d.get("create_time")
update_time = d.get("update_time")
input_parameters = d.get("input_parameters")
output_parameters = d.get("output_parameters")
try:
wf = WorkflowResponse(
workflow_id=workflow_id,
name=name,
desc=desc,
url=d.get("url"),
icon_uri=d.get("icon_uri"),
create_time=create_time,
update_time=update_time,
space_id=space_id,
input_parameters=input_parameters,
output_parameters=output_parameters,
tags=workflow_tags
)
res_list.append(wf)
except Exception as e:
logger.error(f"Error creating WorkflowResponse for {workflow_id}: {str(e)}")
continue
return res_list
def _validate_and_normalize_pagination_data(data: dict, default_page: int = 1, default_page_size: int = 10) -> tuple:
"""
验证和标准化分页数据的通用函数
Args:
data: 包含分页数据的字典
default_page: 默认页码
default_page_size: 默认页面大小
Returns:
(total, page, page_size, total_pages) 的元组
"""
total = data.get("total", 0)
page = data.get("page", default_page)
page_size = data.get("page_size", default_page_size)
total_pages = data.get("total_pages", 1)
if not isinstance(total, int) or total < 0:
total = 0
if not isinstance(page, int) or page < 1:
page = default_page
if not isinstance(page_size, int) or page_size < 1:
page_size = default_page_size
if not isinstance(total_pages, int) or total_pages < 1:
total_pages = max(1, (total + page_size - 1) // page_size)
return total, page, page_size, total_pages
@with_exception_handling
def workflow_create(
req: WorkflowCreate,
current_user: dict
) -> ResponseModel:
"""创建新的工作流"""
start_time = time.time()
logger.info(f"Starting workflow creation request for user {current_user.get('user_id', 'unknown')}")
_ = check_user_space(req.space_id, current_user)
workflow_id = str(uuid.uuid4())
current_time = milliseconds()
workflow_schema = get_default_workflow_schema()
inputs, outputs = convert.extract_inputs_and_outputs_from_canvas(workflow_schema)
workflow = WorkflowBaseDBPd(
workflow_id=workflow_id,
name=req.name,
desc=req.desc,
url="test",
icon_uri=req.icon_uri,
space_id=req.space_id,
create_time=current_time,
update_time=current_time,
schema=json.dumps(workflow_schema),
input_parameters=inputs,
output_parameters=outputs
)
logger.debug(f"create workflow info: {workflow}")
logger.info(f"Creating workflow: {workflow.workflow_id} in space {workflow.space_id}")
create_result = workflow_repository.workflow_create(workflow)
logger.debug(f"create workflow info into db result: {create_result}")
if create_result.code == status.HTTP_200_OK:
logger.info(f"Workflow created successfully: {workflow.workflow_id}")
if create_result.code != status.HTTP_200_OK:
return ResponseModel(
code=create_result.code,
message=create_result.message,
)
processed_tags = []
if hasattr(req, 'tags') and req.tags:
processed_tags = create_workflow_tags(workflow_id, req.space_id, req.tags, current_user)
logger.info(f"Processed tags for workflow {workflow_id}: {processed_tags}")
workflow_dict = workflow.model_dump()
workflow_dict['tags'] = processed_tags
end_time = time.time()
execution_time = end_time - start_time
logger.info(f"Workflow creation completed successfully in {execution_time:.3f}s - ID: {workflow.workflow_id}")
return ResponseModel(
code=status.HTTP_200_OK,
message="create workflow success",
data={"workflow": workflow_dict}
)
def _sync_model_config_in_schema(schema: dict, space_id: str) -> bool:
"""
Sync model configuration (name, type) in workflow schema with latest data from DB.
Recursively traverses nodes and blocks.
Returns:
bool: True if any changes were made, False otherwise
"""
has_changes = False
def _traverse_and_update(nodes, model_mgr):
nonlocal has_changes
if not nodes:
return
for node in nodes:
try:
if "data" in node and "inputs" in node["data"]:
inputs = node["data"]["inputs"]
if "llmParam" in inputs:
llm_param = inputs["llmParam"]
if "model" in llm_param:
model_info = llm_param["model"]
model_id = model_info.get("id")
if model_id:
try:
model_config = model_mgr.get_config_by_id(int(model_id), space_id)
if model_config:
if model_info.get("name") != model_config.name or \
model_info.get("type") != model_config.model_type:
model_info["name"] = model_config.name
model_info["type"] = model_config.model_type
has_changes = True
logger.debug(f"Synced model {model_id} for node {node.get('id')}")
except Exception as e:
logger.warning(
"Failed to sync model config for node "
f"{node.get('id')}, model_id {model_id}: {e}"
)
except Exception as e:
logger.warning(f"Error processing node {node.get('id')}: {e}")
if "blocks" in node and isinstance(node["blocks"], list):
_traverse_and_update(node["blocks"], model_mgr)
try:
with get_db_jw() as db:
model_mgr = ModelConfigManager(db)
if "nodes" in schema and isinstance(schema["nodes"], list):
_traverse_and_update(schema["nodes"], model_mgr)
except Exception as e:
logger.error(f"Error in _sync_model_config_in_schema: {e}")
return has_changes
@with_exception_handling
def workflow_canvas(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""获取工作流画布数据"""
_ = check_user_space(req.space_id, current_user)
canvas_result = workflow_repository.workflow_canvas(req)
logger.debug(f"get workflow info from db result: {canvas_result}")
logger.info(f"Retrieved workflow canvas: {req.workflow_id}, version: {req.workflow_version or 'draft'}")
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=canvas_result.message,
)
try:
if canvas_result.data and "schema" in canvas_result.data:
schema_str = canvas_result.data["schema"]
if schema_str:
schema = json.loads(schema_str)
is_changed = _sync_model_config_in_schema(schema, req.space_id)
if is_changed:
new_schema_str = json.dumps(schema)
canvas_result.data["schema"] = new_schema_str
is_draft = not req.workflow_version or req.workflow_version == 'draft'
if is_draft:
logger.info(f"Auto-saving updated model config for workflow {req.workflow_id}")
update_data = {
"workflow_id": req.workflow_id,
"space_id": req.space_id,
"schema": new_schema_str
}
workflow_repository.workflow_save(update_data)
except Exception as e:
logger.error(f"Failed to sync model config in workflow canvas: {e}")
return ResponseModel(
code=status.HTTP_200_OK,
message="canvas workflow success",
data=WorkflowBaseResponse(workflow=WorkflowBase(**canvas_result.data))
)
@with_exception_handling
def workflow_convert(
req: WorkflowId,
current_user: dict,
skip_validation: bool = False
) -> ResponseModel:
"""转换工作流数据格式"""
_ = check_user_space(req.space_id, current_user)
canvas_result = workflow_repository.workflow_canvas(req)
logger.debug(f"get workflow info from db result: {canvas_result}")
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=canvas_result.message,
)
workflow = convert.workflow_convert(WorkflowBase(**canvas_result.data), skip_validation=skip_validation)
logger.debug(f"workflow info convert dl: {workflow}")
return ResponseModel(
code=status.HTTP_200_OK,
message="convert workflow success",
data=workflow
)
@with_exception_handling
def workflow_export_py(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""Export a workflow as a runnable Python script."""
from openjiuwen_studio.core.manager.workflow_code_generator import generate_workflow_python
_ = check_user_space(req.space_id, current_user)
canvas_result = workflow_repository.workflow_canvas(req)
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=canvas_result.message,
)
dl_workflow = convert.workflow_convert(WorkflowBase(**canvas_result.data), skip_validation=True)
python_code = generate_workflow_python(dl_workflow)
logger.info(f"Exported workflow as Python script: {req.workflow_id}")
return ResponseModel(
code=status.HTTP_200_OK,
message="export workflow python success",
data={"workflow_id": req.workflow_id, "python_code": python_code}
)
@with_exception_handling
def workflow_export(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""Export a workflow as executable DSL JSON with dependencies."""
_type_prefix_to_key: dict[int, str] = {
int(ComponentType.COMPONENT_TYPE_START): "Start",
int(ComponentType.COMPONENT_TYPE_END): "End",
int(ComponentType.COMPONENT_TYPE_LLM): "LLM",
int(ComponentType.COMPONENT_TYPE_IF): "Condition",
int(ComponentType.COMPONENT_TYPE_LOOP): "Loop",
int(ComponentType.COMPONENT_TYPE_INTENT): "Intent",
int(ComponentType.COMPONENT_TYPE_QUESTION): "Questioner",
int(ComponentType.COMPONENT_TYPE_INPUT): "Input",
int(ComponentType.COMPONENT_TYPE_OUTPUT): "Output",
int(ComponentType.COMPONENT_TYPE_CODE): "Code",
int(ComponentType.COMPONENT_TYPE_TEXT_EDITOR): "TextEditor",
int(ComponentType.COMPONENT_TYPE_CONTINUE): "Continue",
int(ComponentType.COMPONENT_TYPE_BREAK): "Break",
int(ComponentType.COMPONENT_TYPE_SUB_WORKFLOW): "Workflow",
int(ComponentType.COMPONENT_TYPE_EMPTY_START): "BlockStart",
int(ComponentType.COMPONENT_TYPE_EMPTY_END): "BlockEnd",
int(ComponentType.COMPONENT_TYPE_SET_VARIABLE): "Variable",
int(ComponentType.COMPONENT_TYPE_VARIABLE_MERGE): "VariableMerge",
int(ComponentType.COMPONENT_TYPE_PLUGIN): "Plugin",
int(ComponentType.COMPONENT_TYPE_HTTP_REQUEST): "HttpRequest",
int(ComponentType.COMPONENT_TYPE_REACT_AGENT): "ReactAgent",
int(ComponentType.COMPONENT_TYPE_KNOWLEDGE_RETRIEVAL): "KnowledgeRetrieval",
}
def _rewrite_export_component_id_prefixes(dsl_root: dict) -> None:
if not isinstance(dsl_root, dict):
return
id_map: dict[str, str] = {}
def _type_int(v: Any) -> int | None:
if isinstance(v, int):
return v
if isinstance(v, str):
s = v.strip()
if s.isdigit():
return int(s)
return None
def _new_id(cid: str, t: int) -> str | None:
key = _type_prefix_to_key.get(t)
if not key:
return None
try:
prefix, rest = cid.split("_", 1)
except ValueError:
return None
if prefix.isdigit() and int(prefix) == t:
new = f"{key}_{rest}"
return new if new != cid else None
return None
def _collect(obj: Any) -> None:
if isinstance(obj, dict):
comps = obj.get("components")
if isinstance(comps, list):
for c in comps:
if not isinstance(c, dict):
continue
t = _type_int(c.get("type"))
cid = c.get("id")
if t is None or not isinstance(cid, str) or not cid:
continue
new = _new_id(cid, t)
if new:
id_map[cid] = new
c["id"] = new
for v in obj.values():
_collect(v)
elif isinstance(obj, list):
for it in obj:
_collect(it)
def _sub_in_str(s: str) -> str:
for old in sorted(id_map.keys(), key=len, reverse=True):
if old in s:
s = s.replace(old, id_map[old])
return s
def _replace(obj: Any) -> None:
if not id_map:
return
if isinstance(obj, dict):
for k, v in list(obj.items()):
if isinstance(v, str):
obj[k] = _sub_in_str(v)
else:
_replace(v)
elif isinstance(obj, list):
for i, item in enumerate(obj):
if isinstance(item, str):
obj[i] = _sub_in_str(item)
else:
_replace(item)
_collect(dsl_root)
_replace(dsl_root)
def _canvas_schema_dict(canvas_data: dict | None) -> dict:
if not canvas_data or not isinstance(canvas_data, dict):
return {}
raw = canvas_data.get("schema") or canvas_data.get("workflow_schema")
if isinstance(raw, str):
try:
return json.loads(raw or "{}")
except json.JSONDecodeError:
return {}
if isinstance(raw, dict):
return raw
return {}
def _node_id_to_llm_model_id(canvas_data: dict | None) -> dict[str, str]:
"""画布节点 id -> 模型配置表 id(llmParam.model.id)。"""
out: dict[str, str] = {}
for node in _canvas_schema_dict(canvas_data).get("nodes") or []:
if not isinstance(node, dict):
continue
nid = node.get("id")
data = node.get("data")
if not nid or not isinstance(data, dict):
continue
inputs = data.get("inputs")
if not isinstance(inputs, dict):
continue
llm = inputs.get("llm_param") or inputs.get("llmParam")
if not isinstance(llm, dict):
continue
model = llm.get("model")
if not isinstance(model, dict):
continue
mid = model.get("id")
if mid is None or mid == "":
continue
out[str(nid)] = str(mid)
return out
def _patch_export_model_fields(export_root: dict, canvas_data: dict | None, space_id: str) -> None:
"""导出 DSL 中补全 configs.model.model_id,同步 model_name,并在 model_client_config 中写入 parameters;导出文件中不包含真实 api_key。"""
id_map = _node_id_to_llm_model_id(canvas_data)
db_params_by_model_id: dict[str, dict | None] = {}
def _db_parameters_for_model(model_id_str: str) -> dict | None:
if model_id_str in db_params_by_model_id:
return db_params_by_model_id[model_id_str]
out: dict | None = None
try:
mid_int = int(model_id_str)
except (ValueError, TypeError):
db_params_by_model_id[model_id_str] = None
return None
try:
db = SessionLocal()
try:
mgr = ModelConfigManager(db)
mc = mgr.get_config_by_id(mid_int, space_id)
p = mc.parameters
out = dict(p) if isinstance(p, dict) else {}
finally:
db.close()
except Exception as e:
logger.debug(
"[WORKFLOW_EXPORT] Skip model parameters lookup for model_id=%s: %s",
model_id_str,
e,
)
out = None
db_params_by_model_id[model_id_str] = out
return out
def _patch_one_model(m: dict, comp_id: str | None) -> None:
if not isinstance(m, dict):
return
rc = m.get("request_config")
if not isinstance(rc, dict):
return
mcc = m.get("model_client_config")
if mcc is None:
mcc = {}
m["model_client_config"] = mcc
elif not isinstance(mcc, dict):
return
mn = rc.get("model_name") or ""
if mn and not (mcc.get("model_name") or "").strip():
mcc["model_name"] = mn
if comp_id and not (m.get("model_id") or "").strip():
mid = id_map.get(str(comp_id))
if mid:
m["model_id"] = mid
resolved_mid = (m.get("model_id") or "").strip()
if not resolved_mid and comp_id:
resolved_mid = id_map.get(str(comp_id)) or ""
defaults = ModelParameters()
dbp = _db_parameters_for_model(resolved_mid) if resolved_mid else None
temperature = rc.get("temperature")
if temperature is None:
temperature = (dbp or {}).get("temperature") if dbp else None
if temperature is None:
temperature = defaults.temperature
top_p = rc.get("top_p")
if top_p is None:
top_p = (dbp or {}).get("top_p") if dbp else None
if top_p is None:
top_p = defaults.top_p
max_tokens = rc.get("max_tokens")
if max_tokens is None and dbp is not None:
max_tokens = dbp.get("max_tokens")
if max_tokens is None:
max_tokens = defaults.max_tokens
mcc["parameters"] = {
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
}
def _walk(obj: Any) -> None:
if isinstance(obj, dict):
mcc_any = obj.get("model_client_config")
if isinstance(mcc_any, dict):
mcc_any["api_key"] = None
cid = obj.get("id")
cfg = obj.get("configs")
if isinstance(cfg, dict) and isinstance(cfg.get("model"), dict):
_patch_one_model(cfg["model"], str(cid) if cid is not None else None)
for v in obj.values():
_walk(v)
elif isinstance(obj, list):
for item in obj:
_walk(item)
_walk(export_root)
def _extract_sub_workflow_refs(payload) -> list[tuple[str, str]]:
refs: list[tuple[str, str]] = []
seen_refs: set[tuple[str, str]] = set()
def _normalize_sub_info(sub_info: dict) -> tuple[str, str]:
if "id" in sub_info:
sub_id = sub_info.get("id", "")
sub_version = sub_info.get("version", "draft") or "draft"
return sub_id, sub_version
sub_id = sub_info.get("workflowId", "")
sub_version = sub_info.get("workflowVersion", "draft") or "draft"
return sub_id, sub_version
def _add_ref(sub_info: dict):
sub_id, sub_version = _normalize_sub_info(sub_info)
if not sub_id:
return
key = (sub_id, sub_version)
if key in seen_refs:
return
seen_refs.add(key)
refs.append(key)
def _walk_payload(node):
if node is None:
return
if isinstance(node, list):
for item in node:
_walk_payload(item)
return
if not isinstance(node, dict):
return
sub_info = (
node.get("sub_workflow_info")
or node.get("subWorkflowInfo")
or node.get("subWorkflow")
)
if isinstance(sub_info, dict):
_add_ref(sub_info)
for value in node.values():
_walk_payload(value)
_walk_payload(payload)
return refs
def _build_workflow_dsl(workflow_id: str, workflow_version: str) -> dict | None:
wf_req = WorkflowId(
workflow_id=workflow_id,
space_id=req.space_id,
workflow_version=workflow_version,
)
sub_canvas_result = workflow_repository.workflow_canvas(wf_req)
if sub_canvas_result.code != status.HTTP_200_OK:
logger.warning(
f"[WORKFLOW_EXPORT] Skip sub workflow {workflow_id}:{workflow_version}, "
f"canvas fetch failed: {sub_canvas_result.message}"
)
return None
sub_dl_workflow = convert.workflow_convert(
WorkflowBase(**sub_canvas_result.data),
skip_validation=True,
export_raw_auth=True
)
sub_executable_workflow = ExecutableWorkflow(sub_dl_workflow, req.space_id, current_user)
sub_export = sub_executable_workflow.dl_workflow.model_dump()
_patch_export_model_fields(sub_export, sub_canvas_result.data, req.space_id)
_rewrite_export_component_id_prefixes(sub_export)
return sub_export
_ = check_user_space(req.space_id, current_user)
canvas_result = workflow_repository.workflow_canvas(req)
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=canvas_result.message,
)
dl_workflow = convert.workflow_convert(
WorkflowBase(**canvas_result.data),
skip_validation=True,
export_raw_auth=True
)
executable_workflow = ExecutableWorkflow(dl_workflow, req.space_id, current_user)
export_data = executable_workflow.dl_workflow.model_dump()
_patch_export_model_fields(export_data, canvas_result.data, req.space_id)
_rewrite_export_component_id_prefixes(export_data)
dependency_workflows: list[dict] = []
processed: set[tuple[str, str]] = {(req.workflow_id, req.workflow_version or "draft")}
pending = _extract_sub_workflow_refs(export_data)
while pending:
sub_id, sub_version = pending.pop(0)
key = (sub_id, sub_version)
if key in processed:
continue
processed.add(key)
sub_dsl = _build_workflow_dsl(sub_id, sub_version)
if not sub_dsl:
continue
dependency_workflows.append(sub_dsl)
pending.extend(_extract_sub_workflow_refs(sub_dsl))
export_data["dependencies"] = {"workflows": dependency_workflows}
logger.info(f"[WORKFLOW_EXPORT] Collected sub workflow DSL count: {len(dependency_workflows)}")
logger.info(f"Exported workflow DSL JSON: {req.workflow_id}")
return ResponseModel(
code=status.HTTP_200_OK,
message="export workflow success",
data=export_data
)
@with_exception_handling
def workflow_delete(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""删除工作流"""
logger.warning(
f"Workflow deletion attempt by user {current_user.get('user_id', 'unknown')} - Workflow ID: {req.workflow_id}")
_ = check_user_space(req.space_id, current_user)
try:
can_delete, message = check_referenced_dependencies(
reference_repository, req.space_id, "WORKFLOW", req.workflow_id
)
if not can_delete:
logger.warning(f"Workflow deletion blocked due to dependencies: {req.workflow_id} - {message}")
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=message,
)
except Exception as e:
logger.error(f"Error checking dependencies for workflow {req.workflow_id}: {e}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="Error checking dependencies, deletion blocked for safety",
)
delete_result = workflow_repository.workflow_draft_delete(req)
logger.debug(f"delete workflow info in db result: {delete_result}")
if delete_result.code == status.HTTP_200_OK:
logger.info(f"Workflow deleted successfully: {req.workflow_id}")
else:
logger.error(f"Failed to delete workflow {req.workflow_id}: {delete_result.message}")
if delete_result.code != status.HTTP_200_OK:
return ResponseModel(
code=delete_result.code,
message=delete_result.message,
)
try:
cleanup_result = reference_repository.reference_delete_by_referer(
req.space_id, "WORKFLOW", req.workflow_id
)
if cleanup_result["code"] != status.HTTP_200_OK:
logger.warning(
f"Failed to cleanup references for deleted workflow {req.workflow_id}: {cleanup_result['message']}")
except Exception as e:
logger.error(f"Error cleaning up references for workflow {req.workflow_id}: {e}")
return ResponseModel(
code=status.HTTP_200_OK,
message="delete workflow success",
)
@with_exception_handling
def workflow_publish_delete(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""删除publish工作流"""
logger.warning(
f"Workflow publish deletion attempt by user {current_user.get('user_id', 'unknown')} - Workflow ID: {req.workflow_id} v{req.workflow_version}")
_ = check_user_space(req.space_id, current_user)
try:
result = reference_repository.reference_list_by_referenced(
req.space_id, "WORKFLOW", req.workflow_id
)
if result["code"] == status.HTTP_200_OK and result["data"]:
version_refs = [ref for ref in result["data"] if ref.get('referenced_version') == req.workflow_version]
if version_refs:
referrers = []
for ref in version_refs:
referrer_info = f"{ref['referer_type']}({ref['referer_id']}"
if ref.get('referer_version') and ref['referer_version'] != 'draft':
referrer_info += f":{ref['referer_version']}"
referrer_info += ")"
referrers.append(referrer_info)
logger.warning(
f"Workflow publish version deletion blocked due to dependencies: {req.workflow_id}:{req.workflow_version} - referenced by {', '.join(referrers)}")
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=f"Cannot delete version {req.workflow_version}: referenced by {', '.join(referrers)}",
)
except Exception as e:
logger.error(f"Error checking dependencies for workflow {req.workflow_id}:{req.workflow_version}: {e}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="Error checking dependencies, deletion blocked for safety",
)
delete_result = workflow_repository.workflow_publish_delete(req)
logger.debug(f"delete workflow publish info in db result: {delete_result}")
if delete_result.code == status.HTTP_200_OK:
logger.info(f"Workflow publish deleted successfully: {req.workflow_id} v{req.workflow_version}")
else:
logger.error(f"Failed to delete workflow publish {req.workflow_id}: {delete_result.message}")
if delete_result.code != status.HTTP_200_OK:
return ResponseModel(
code=delete_result.code,
message=delete_result.message,
)
try:
cleanup_result = reference_repository.reference_delete_by_referer_with_version(
req.space_id, "WORKFLOW", req.workflow_id, req.workflow_version
)
if cleanup_result["code"] != status.HTTP_200_OK:
logger.warning(
f"Failed to cleanup references for deleted workflow publish {req.workflow_id}:{req.workflow_version}: {cleanup_result['message']}")
except Exception as e:
logger.error(f"Error cleaning up references for workflow publish {req.workflow_id}:{req.workflow_version}: {e}")
return ResponseModel(
code=status.HTTP_200_OK,
message="delete workflow publish success",
)
@with_exception_handling
def workflow_canvas_save(
req: WorkflowSave,
current_user: dict
) -> ResponseModel:
start_time = time.time()
logger.info(
f"Starting workflow save request for user {current_user.get('user_id', 'unknown')} - Workflow ID: {req.workflow_id}")
_ = check_user_space(req.space_id, current_user)
workflow_id = WorkflowId(
workflow_id=req.workflow_id,
space_id=req.space_id,
)
canvas_result = workflow_repository.workflow_canvas(workflow_id)
logger.debug(f"get workflow info from db result: {canvas_result}")
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=canvas_result.message,
)
workflow = WorkflowBase(**canvas_result.data)
canvas_data = json.loads(req.schema)
schema_size = len(req.schema)
if schema_size > 1024 * 1024:
logger.warning(
f"Large workflow data detected: {schema_size / 1024 / 1024:.2f}MB - Workflow ID: {req.workflow_id}")
inputs, outputs = convert.extract_inputs_and_outputs_from_canvas(canvas_data)
save_data = {
"workflow_id": req.workflow_id,
"space_id": req.space_id,
"schema": req.schema,
"input_parameters": inputs,
"output_parameters": outputs
}
save_result = workflow_repository.workflow_save(save_data)
logger.debug(f"save workflow info into db result: {save_result}")
if save_result and save_result.code == status.HTTP_200_OK:
logger.info(f"Workflow saved successfully: {req.workflow_id}")
else:
logger.error(
f"Failed to save workflow {req.workflow_id}: {save_result.message if save_result else 'Unknown error'}")
if not save_result or save_result.code != status.HTTP_200_OK:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Workflow with id {req.workflow_id} save into db failed"
)
try:
delete_result = reference_repository.reference_delete_by_referer_with_version(
req.space_id, "WORKFLOW", req.workflow_id, "draft"
)
if delete_result["code"] != status.HTTP_200_OK:
logger.warning(
f"Failed to delete old references for workflow {req.workflow_id}: {delete_result['message']}")
references = extract_workflow_references(req.schema, req.space_id, req.workflow_id, "draft")
for ref in references:
create_result = reference_repository.reference_create(ref)
if create_result["code"] != status.HTTP_200_OK:
logger.warning(f"Failed to create reference {ref}: {create_result['message']}")
logger.info(
f"Reference management completed for workflow {req.workflow_id}: {len(references)} references processed")
except Exception as e:
logger.error(f"Error managing references for workflow {req.workflow_id}: {e}")
res_data = WorkflowResponseSave(
name=workflow.name,
url=workflow.url,
status=0,
workflow_status=0
)
logger.debug(f"save workflow info response data: {res_data}")
end_time = time.time()
execution_time = end_time - start_time
logger.info(f"Workflow save completed successfully in {execution_time:.3f}s - ID: {req.workflow_id}")
return ResponseModel(
code=status.HTTP_200_OK,
message=f"save workflow with id {req.workflow_id} into db success",
data=res_data
)
@with_exception_handling
def _update_workflow_name_in_agents(
space_id: str,
workflow_id: str,
new_workflow_name: str,
new_workflow_desc: str = None
) -> ResponseModel[dict]:
"""同步更新所有引用该工作流的agent配置中的工作流名称和描述"""
with get_db_jw() as db:
updated_count = 0
failed_count = 0
errors = []
agent_db = JiuwenBaseRepository(db, AgentBaseDB)
draft_agents = agent_db.get_dl_in_sql(
find_id={"space_id": space_id},
return_first_item=False
)
if draft_agents.code == status.HTTP_200_OK and draft_agents.data:
for agent_data in draft_agents.data:
try:
agent_id = agent_data.get("agent_id")
workflows = agent_data.get("workflows", [])
if not isinstance(workflows, list):
workflows = []
updated = False
for workflow in workflows:
if workflow.get("workflow_id") == workflow_id:
if new_workflow_name:
workflow["workflow_name"] = new_workflow_name
if new_workflow_desc:
workflow["description"] = new_workflow_desc
updated = True
if updated:
agent_data["workflows"] = workflows
agent_data["update_time"] = milliseconds()
if "primary_id" in agent_data:
del agent_data["primary_id"]
update_result = agent_db.update_dl_in_sql(
find_id={
"agent_id": agent_id,
"agent_version": AgentBaseDB.__version_none__
},
update_dl=agent_data
)
if update_result.code == status.HTTP_200_OK:
updated_count += 1
logger.info(
f"[WORKFLOW_UPDATE] Updated workflow name in agent {agent_id} (draft version)")
else:
failed_count += 1
errors.append(f"Failed to update agent {agent_id} (draft): {update_result.message}")
except Exception as e:
failed_count += 1
agent_id_str = agent_data.get('agent_id', 'unknown')
error_msg = f"Error processing agent {agent_id_str} "
error_msg += f"(draft): {str(e)}"
errors.append(error_msg)
logger.error(
f"[WORKFLOW_UPDATE] Failed to update workflow name "
f"in agent {agent_id_str}: {e}")
agent_publish_db = JiuwenBaseRepository(db, AgentPublishDB)
publish_agents = agent_publish_db.get_dl_in_sql(
find_id={"space_id": space_id},
return_first_item=False
)
if publish_agents.code == status.HTTP_200_OK and publish_agents.data:
for agent_data in publish_agents.data:
try:
agent_id = agent_data.get("agent_id")
agent_version = agent_data.get("agent_version")
workflows = agent_data.get("workflows", [])
if not isinstance(workflows, list):
workflows = []
updated = False
for workflow in workflows:
if workflow.get("workflow_id") == workflow_id:
if new_workflow_name:
workflow["workflow_name"] = new_workflow_name
if new_workflow_desc:
workflow["description"] = new_workflow_desc
updated = True
if updated:
agent_data["workflows"] = workflows
agent_data["update_time"] = milliseconds()
if "primary_id" in agent_data:
del agent_data["primary_id"]
update_result = agent_publish_db.update_dl_in_sql(
find_id={
"agent_id": agent_id,
"agent_version": agent_version
},
update_dl=agent_data
)
if update_result.code == status.HTTP_200_OK:
updated_count += 1
logger.info(
f"[WORKFLOW_UPDATE] Updated workflow name in agent {agent_id} "
f"(publish v{agent_version})")
else:
failed_count += 1
error_msg = f"Failed to update agent {agent_id} v{agent_version} "
error_msg += f"(publish): {update_result.message}"
errors.append(error_msg)
except Exception as e:
failed_count += 1
agent_id_str = agent_data.get('agent_id', 'unknown')
error_msg = f"Error processing agent {agent_id_str} "
error_msg += f"(publish): {str(e)}"
errors.append(error_msg)
logger.error(
f"[WORKFLOW_UPDATE] Failed to update workflow name "
f"in agent {agent_id_str} (publish): {e}")
return ResponseModel(
code=status.HTTP_200_OK,
message=f"Updated workflow name in agents: {updated_count} updated, {failed_count} failed",
data={
"updated_count": updated_count,
"failed_count": failed_count,
"errors": errors if errors else None
}
)
@with_exception_handling
def workflow_meta_update(
req: WorkflowUpdate,
current_user: dict
) -> ResponseModel:
_ = check_user_space(req.space_id, current_user)
update_data = {
"workflow_id": req.workflow_id,
"space_id": req.space_id
}
if req.name is not None:
update_data["name"] = req.name
if req.desc is not None:
update_data["desc"] = req.desc
if req.url is not None:
update_data["url"] = req.url
if req.icon_uri is not None:
update_data["icon_uri"] = req.icon_uri
update_result = workflow_repository.workflow_save(update_data)
logger.debug(f"update workflow meta into db result: {update_result}")
if update_result and update_result.code == status.HTTP_200_OK:
logger.info(f"Workflow metadata updated successfully: {req.workflow_id}")
else:
logger.error(
f"Failed to update workflow metadata {req.workflow_id}: {update_result.message if update_result else 'Unknown error'}")
if not update_result or update_result.code != status.HTTP_200_OK:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Workflow with id {req.workflow_id} save into db failed"
)
processed_tags = []
if req.tags is not None:
try:
processed_tags = update_workflow_tags(req.workflow_id, req.space_id, req.tags, current_user)
logger.info(f"Updated tags incrementally for workflow {req.workflow_id}: {processed_tags}")
except ValueError as e:
logger.warning(f"Tag update failed for workflow {req.workflow_id}: {str(e)}")
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=str(e)
)
if req.name is not None or req.desc is not None:
sync_result = _update_workflow_name_in_agents(
space_id=req.space_id,
workflow_id=req.workflow_id,
new_workflow_name=req.name,
new_workflow_desc=req.desc
)
logger.info(
f"Synced workflow name in agents: {sync_result.data['updated_count']} updated, "
f"{sync_result.data['failed_count']} failed"
)
if req.name is not None:
pr_result = prompt_relation_repository.update_member_name_in_prompt_relation(
space_id=req.space_id,
member_type="WORKFLOW",
member_id=req.workflow_id,
new_name=req.name,
)
if pr_result.code == status.HTTP_200_OK:
logger.info(f"Synced workflow name in prompt_relation: {pr_result.message}")
else:
logger.warning(f"Sync workflow name in prompt_relation failed: {pr_result.message}")
res_data = WorkflowResponseUpdate(
workflow_id=req.workflow_id,
success=True
)
logger.debug(f"update workflow meta response data: {res_data}")
return ResponseModel(
code=status.HTTP_200_OK,
message=f"save workflow with id {req.workflow_id} into db success",
data=res_data.model_dump()
)
@with_exception_handling
def workflow_list(
req: WorkflowList,
current_user: dict
) -> ResponseModel:
_ = check_user_space(req.space_id, current_user)
list_result = workflow_repository.workflow_list(SpaceAWPQuery.model_validate(req.model_dump()))
logger.debug(f"get workflow list from db result: {list_result.code}")
if list_result.code == status.HTTP_404_NOT_FOUND or list_result.code == status.HTTP_400_BAD_REQUEST:
return ResponseModel(
code=status.HTTP_200_OK,
message=f"Get workflow list with space_id {req.space_id} failed, error: {list_result.message}",
)
elif list_result.code != status.HTTP_200_OK:
return ResponseModel(
code=list_result.code,
message=f"Get workflow list with space_id {req.space_id} failed, error: {list_result.message}",
)
data_list = list_result.data.get("workflow_list", [])
res_list = _process_workflow_data_list(data_list)
total, page, page_size, total_pages = _validate_and_normalize_pagination_data(
list_result.data,
default_page=req.page or 1,
default_page_size=req.page_size or 10
)
res_data = WorkflowResponseList(
workflow_list=res_list,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
logger.info(
f"get workflow list success: {len(res_data.workflow_list)} workflows, page {res_data.page}/{res_data.total_pages}")
return ResponseModel(
code=status.HTTP_200_OK,
message="Get workflow list success",
data=res_data
)
@with_exception_handling
def workflow_search(
req: WorkflowSearchRequest,
current_user: dict
) -> ResponseModel:
"""搜索工作流"""
_ = check_user_space(req.space_id, current_user)
search_params = {
"space_id": req.space_id,
"search_term": req.search_term or "",
"tags": req.tags or [],
"status_filter": req.status_filter or "all",
"sort_by": req.sort_by.value if req.sort_by else "update_time",
"sort_order": req.sort_order.value if req.sort_order else "desc",
"page": req.page or 1,
"page_size": req.page_size or 10
}
try:
search_result = workflow_repository.workflow_search(search_params)
logger.debug(f"search workflow from db result: {search_result.code}")
if search_result.code != status.HTTP_200_OK:
return ResponseModel(
code=search_result.code,
message=f"Search workflow with space_id {req.space_id} failed, error: {search_result.message}",
)
except Exception as e:
logger.error(f"Search workflow exception: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Search workflow with space_id {req.space_id} failed, error: {str(e)}"
)
if not search_result.data or not isinstance(search_result.data, dict):
logger.warning(f"Invalid search result data structure: {search_result.data}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="Invalid search result data structure"
)
data_list = search_result.data.get("workflow_list", [])
if not isinstance(data_list, list):
logger.warning(f"Invalid workflow_list in search result: {data_list}")
data_list = []
res_list = _process_workflow_data_list(data_list)
total, page, page_size, total_pages = _validate_and_normalize_pagination_data(
search_result.data,
default_page=req.page or 1,
default_page_size=req.page_size or 10
)
try:
res_data = WorkflowSearchResponse(
workflow_list=res_list,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
search_term=req.search_term,
filters={
"tags": req.tags or [],
"status_filter": req.status_filter or "all",
"sort_by": search_params["sort_by"],
"sort_order": search_params["sort_order"]
}
)
logger.info(
f"search workflow success: {len(res_data.workflow_list)} results for term '{req.search_term}', page {res_data.page}/{res_data.total_pages}")
except Exception as e:
logger.error(f"Error creating WorkflowSearchResponse: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Error creating search response: {str(e)}"
)
return ResponseModel(
code=status.HTTP_200_OK,
message="Search workflow success",
data=res_data
)
def deal_db_error(result: ResponseModel) -> ResponseModel:
if result is None:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="publish workflow failed, error: result can not be None",
data=None
)
if result.code == status.HTTP_400_BAD_REQUEST or result.code == status.HTTP_404_NOT_FOUND:
return ResponseModel(
code=status.HTTP_200_OK,
message=f"publish workflow failed, error: {result.message}",
data=None
)
return ResponseModel(
code=result.code,
message=f"publish workflow failed, error: {result.message}",
data=None
)
@with_exception_handling
def workflow_publish(
req: WorkflowPublish,
current_user: dict
) -> ResponseModel:
start_time = time.time()
logger.warning(
f"Workflow publish attempt by user {current_user.get('user_id', 'unknown')} - Workflow ID: {req.workflow_id}, Version: {req.workflow_version}")
logger.info(
f"Starting workflow publish request for user {current_user.get('user_id', 'unknown')} - Workflow ID: {req.workflow_id}, Version: {req.workflow_version}")
try:
_ = check_user_space(req.space_id, current_user)
workflow_latest_version_query = WorkflowId(
workflow_id=req.workflow_id,
space_id=req.space_id,
workflow_version="latest_publish_version"
)
get_version_result = workflow_repository.workflow_publish_get(workflow_latest_version_query)
logger.debug(f"get version workflow info: {get_version_result}")
if get_version_result.code == status.HTTP_200_OK:
logger.info(f"Retrieved latest version for workflow: {req.workflow_id}")
is_latest_found = True
if get_version_result.code == status.HTTP_404_NOT_FOUND:
is_latest_found = False
elif get_version_result.code != status.HTTP_200_OK:
return ResponseModel(
code=get_version_result.code,
message=f"Get versioned workflow with id {req.workflow_id} failed, error: {get_version_result.message}",
data=None
)
if is_latest_found:
try:
latest_version_data = WorkflowPublishDBPd(**get_version_result.data)
except Exception as e:
logger.error(f"Failed to parse latest version data: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Invalid latest version data format: {str(e)}",
data=None
)
else:
latest_version_data = None
try:
if latest_version_data:
check_res, check_err = check_version(latest_version_data.workflow_version, req.workflow_version)
logger.info(f"get latest workflow info, check version {check_res}")
if not check_res:
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=f"check version failed, error: {check_err}",
data=None
)
else:
current_version, check_err = Version.string_to_object(req.workflow_version)
logger.info(f"no latest workflow info, check version {check_err is None}")
if check_err is not None:
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=f"check version {req.workflow_version} failed, error: {check_err}",
data=None
)
except Exception as e:
logger.error(f"Version validation error: {str(e)}")
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=f"Version validation failed: {str(e)}",
data=None
)
workflow_draft_query = WorkflowId(
workflow_id=req.workflow_id,
space_id=req.space_id
)
canvas_result = workflow_repository.workflow_canvas(workflow_draft_query)
logger.info(f"get draft workflow info: {canvas_result}")
if canvas_result.code != status.HTTP_200_OK:
return ResponseModel(
code=canvas_result.code,
message=f"Get workflow with id {req.workflow_id} failed, error: {canvas_result.message}",
data=None
)
try:
wf_data = WorkflowBaseDBPd(**canvas_result.data)
except Exception as e:
logger.error(f"Failed to parse workflow data: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Invalid workflow data format: {str(e)}",
data=None
)
try:
logger.info(f"validating workflow {req.workflow_id} before publish")
from openjiuwen_studio.core.executor.workflow.workflow_runner import flow_mgr
flow_mgr.validate(req.workflow_id, "draft", req.space_id, current_user)
logger.info(f"workflow {req.workflow_id} validation passed")
except Exception as e:
logger.error(f"workflow validation failed for {req.workflow_id}: {str(e)}")
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message=f"Workflow validation failed: {str(e)}",
data=None
)
try:
workflow_data = wf_data.model_dump(exclude_none=True, exclude={"workflow_version"})
current_time = milliseconds()
workflow_data["create_time"] = current_time
workflow_data["update_time"] = current_time
workflow_data["workflow_version"] = req.workflow_version
workflow_data["version_description"] = req.version_description
logger.info(f"Creating version data with workflow_version: {req.workflow_version}")
logger.debug(f"Version data keys: {list(workflow_data.keys())}")
version_data = WorkflowPublishDBPd(**workflow_data)
except Exception as e:
logger.error(f"Failed to create version data: {str(e)}")
try:
test_data = wf_data.model_dump(exclude_none=True)
logger.error(f"exclude_none data: {test_data}")
for key, value in test_data.items():
logger.error(f"Field {key}: {value} (type: {type(value)})")
except Exception as debug_e:
logger.error(f"Debug error: {str(debug_e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Failed to prepare version data: {str(e)}",
data=None
)
try:
publish_result = workflow_repository.workflow_publish(version_data)
logger.debug(f"publish workflow info result: {publish_result}")
if publish_result.code == status.HTTP_200_OK:
logger.info(f"Workflow published successfully: {req.workflow_id} version {req.workflow_version}")
else:
logger.error(f"Failed to publish workflow {req.workflow_id}: {publish_result.message}")
if publish_result.code != status.HTTP_200_OK:
return deal_db_error(publish_result)
except Exception as e:
logger.error(f"Failed to publish workflow to database: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Database publish operation failed: {str(e)}",
data=None
)
try:
references = extract_workflow_references(
wf_data.schema, req.space_id, req.workflow_id, req.workflow_version)
for ref in references:
create_result = reference_repository.reference_create(ref)
if create_result["code"] != status.HTTP_200_OK:
logger.warning(f"Failed to create publish reference {ref}: {create_result['message']}")
logger.info(
f"Publish reference management completed for workflow {req.workflow_id} v{req.workflow_version}: {len(references)} references processed")
except Exception as e:
logger.error(f"Error managing publish references for workflow {req.workflow_id}: {e}")
try:
res_data = WorkflowResponsePublish(
workflow_id=req.workflow_id,
success=True
)
except Exception as e:
logger.error(f"Failed to create response data: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Failed to create response: {str(e)}",
data=None
)
end_time = time.time()
execution_time = end_time - start_time
logger.info(
f"Workflow publish completed successfully in {execution_time:.3f}s - ID: {req.workflow_id}, Version: {req.workflow_version}")
return ResponseModel(
code=status.HTTP_200_OK,
message="publish workflow success",
data=res_data
)
except Exception as e:
logger.error(f"Unexpected error in workflow_publish: {str(e)}")
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Unexpected error during workflow publish: {str(e)}",
data=None
)
@with_exception_handling
def workflow_copy(
req: WorkflowId,
current_user: dict
) -> ResponseModel:
"""创建新的工作流"""
_ = check_user_space(req.space_id, current_user)
workflow_copy_id = str(uuid.uuid4())
current_time = milliseconds()
if not req.workflow_version:
get_result = workflow_repository.workflow_canvas(req)
else:
get_result = workflow_repository.workflow_publish_get(req)
if get_result.code != status.HTTP_200_OK:
logger.info(f"Copy workflow fail: {get_result}")
return get_result
original_workflow_id = get_result.data.get("workflow_id")
if original_workflow_id:
try:
original_tags = get_workflow_tags(original_workflow_id, req.space_id, req.workflow_version or "draft")
tag_names = [tag.get("tag_name") for tag in original_tags if tag.get("tag_name")]
logger.info(f"Original workflow tags found: {tag_names}")
except Exception as e:
logger.error(f"Error getting original workflow tags: {e}")
tag_names = []
else:
tag_names = []
get_result.data.pop("workflow_version", None)
workflow_base_copy = WorkflowBaseDBPd(**get_result.data)
workflow_base_copy.workflow_id = workflow_copy_id
workflow_base_copy.create_time = current_time
workflow_base_copy.update_time = current_time
workflow_base_copy.name = workflow_base_copy.name + "_copy"
logger.debug(f"copy workflow info: {workflow_base_copy}")
logger.info(f"Copying workflow: {req.workflow_id} -> {workflow_copy_id}")
copy_result = workflow_repository.workflow_create(workflow_base_copy)
logger.debug(f"copy workflow info into db result: {copy_result}")
if copy_result.code == status.HTTP_200_OK:
logger.info(f"Workflow copied successfully: {req.workflow_id} -> {workflow_copy_id}")
else:
logger.error(f"Failed to copy workflow {req.workflow_id}: {copy_result.message}")
if copy_result.code != status.HTTP_200_OK:
return copy_result
if tag_names:
try:
create_workflow_tags(workflow_copy_id, req.space_id, tag_names, current_user)
logger.info(f"Successfully copied {len(tag_names)} tags to workflow {workflow_copy_id}")
except Exception as e:
logger.error(f"Error copying tags to new workflow: {e}")
return ResponseModel(
code=status.HTTP_200_OK,
message="copy workflow success",
data=WorkflowBaseResponse(workflow=WorkflowBase(**workflow_base_copy.model_dump()))
)
def resolve_ref_types(output_parameters: list[dict], schema_str: str) -> list[dict]:
"""Resolve ref and constant types to actual types."""
if not output_parameters or not schema_str:
return output_parameters
try:
schema = json.loads(schema_str)
nodes = {node["id"]: node for node in schema.get("nodes", [])}
end_node = next((node for node in nodes.values()
if node.get("type") == str(ComponentType.COMPONENT_TYPE_END)), None)
if not end_node:
return output_parameters
input_params = end_node.get("data", {}).get("inputs", {}).get("inputParameters", {})
for param in output_parameters:
param_def = input_params.get(param["name"])
if not param_def:
continue
param_type = param["type"]
if param_type == "ref":
content = param_def.get("content", [])
if len(content) >= 2:
node_id, output_name = content[0], content[1]
node = nodes.get(node_id)
if node:
outputs = node.get("data", {}).get("outputs", {})
if outputs.get("type") == "object":
param["type"] = outputs.get("properties", {}).get(output_name, {}).get("type")
else:
param["type"] = outputs.get("type")
elif param_type == "constant":
schema_type = param_def.get("schema", {}).get("type")
if schema_type:
param["type"] = schema_type
return output_parameters
except Exception as e:
logger.error(f"Error resolving ref types: {type(e).__name__}")
return output_parameters
@with_exception_handling
def workflow_version_list(
req: WorkflowVersionListRequest,
current_user: dict
) -> ResponseModel:
"""查询工作流的发布版本列表"""
_ = check_user_space(req.space_id, current_user)
version_result = workflow_repository.get_workflow_publish_list(WorkflowId.model_validate(req.model_dump()))
logger.debug(f"get workflow version list result: {version_result}")
if version_result.code == status.HTTP_200_OK:
logger.info(f"Retrieved version list for workflow: {req.workflow_id}")
else:
logger.warning(f"Failed to retrieve version list for workflow {req.workflow_id}: {version_result.message}")
if version_result.code == status.HTTP_404_NOT_FOUND:
logger.info(f"No published versions found for workflow {req.workflow_id}, returning empty list")
response_data = WorkflowVersionListResponse(
workflow_id=req.workflow_id,
versions=[]
)
return ResponseModel(
code=status.HTTP_200_OK,
message="No workflow version was found",
data=response_data
)
if version_result.code != status.HTTP_200_OK:
return ResponseModel(
code=version_result.code,
message=version_result.message,
data=None
)
version_data = version_result.data or []
versions = []
for version_info in version_data:
versions.append(WorkflowVersionInfo(
workflow_version=version_info.get("workflow_version", ""),
version_description=version_info.get("version_description", ""),
create_time=version_info.get("create_time", 0)
))
response_data = WorkflowVersionListResponse(
workflow_id=req.workflow_id,
versions=versions
)
logger.info(f"get workflow version list success: {len(versions)} versions for workflow {req.workflow_id}")
return ResponseModel(
code=status.HTTP_200_OK,
message="Get workflow version list success",
data=response_data
)
def get_upload_url(
req: dict,
current_user: dict,
minio_client: Minio
) -> ResponseModel:
"""获取文件上传自签名URL"""
space_id = req.get("space_id")
object_key = req.get("object_key")
if not all([space_id, object_key]):
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message="Missing required fields: object_key or space_id"
)
_ = check_user_space(space_id, current_user)
bucket_name = settings.minio_bucket
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
logger.info(f"桶 '{bucket_name}' 创建成功")
else:
logger.info(f"桶 '{bucket_name}' 已存在")
from datetime import timedelta
upload_url = minio_client.presigned_put_object(
bucket_name=bucket_name,
object_name=object_key,
expires=timedelta(hours=1)
)
return ResponseModel(
code=status.HTTP_200_OK,
message="Get upload_url success",
data={"upload_url": upload_url}
)
def get_download_url(
req: dict,
current_user: dict,
minio_client: Minio
) -> ResponseModel:
"""获取文件上传自签名URL"""
space_id = req.get("space_id")
object_key = req.get("object_key")
if not all([space_id, object_key]):
return ResponseModel(
code=status.HTTP_400_BAD_REQUEST,
message="Missing required fields: object_key or space_id"
)
_ = check_user_space(space_id, current_user)
bucket_name = settings.minio_bucket
try:
minio_client.stat_object(bucket_name, object_key)
except Exception as e:
logger.error(f"Object not found in MinIO: {object_key}, error: {str(e)}")
return ResponseModel(
code=status.HTTP_404_NOT_FOUND,
message=f"File {object_key} not found in storage"
)
protocol = "https" if settings.minio_secure else "http"
download_url = f"{protocol}://{settings.minio_host}:{settings.minio_port}/{bucket_name}/{object_key}"
return ResponseModel(
code=status.HTTP_200_OK,
message="Get download_url success",
data={"download_url": download_url}
)