from typing import Any, Optional, Dict, AsyncGenerator, Union
from pydantic import ValidationError
from fastapi import APIRouter, HTTPException, Request, status, Depends
from openjiuwen_studio.core.common.exceptions import BaseError
from openjiuwen.core.common.logging import logger, set_session_id, get_session_id
from openjiuwen.core.session.interaction.interactive_input import InteractiveInput
from pydantic import BaseModel, Field
from sse_starlette import EventSourceResponse
from openjiuwen_studio.core.executor.agent.agent_runner import agent_mgr, AgentRunner
from openjiuwen_studio.core.executor.component.component_runner import comp_executor
from openjiuwen_studio.core.executor.workflow.pregel_graph_adapter import JiuWenGraphException
from openjiuwen_studio.core.executor.workflow.workflow_runner import flow_mgr, WorkflowRunner
from openjiuwen_studio.core.manager.login_manager.user import get_current_user
from openjiuwen_studio.core.utils.exception import log_exception, handle_http_exception, get_safe_error_message
from openjiuwen_studio.schemas import ResponseModel
from openjiuwen_studio.core.executor.plugin.plugin_mgr import plugin_mgr
from openjiuwen_studio.routers.common import handle_response, validate_request
from openjiuwen_studio.core.manager.repositories.trace_summary_repository import trace_summary_repository
from openjiuwen_studio.core.manager.login_manager.space import check_user_space
from openjiuwen_studio.schemas.trace_summary import (TraceSummaryListRequest, TraceSummaryByTraceIdRequest,
TraceSummaryLatestRequest, TraceSummaryBrief,
TraceSummaryListBySpaceRequest, TraceSummaryBriefWithStatus,
ActiveExecutionInfo)
from openjiuwen_studio.schemas.execution_log import ExecutionLogSummary
from openjiuwen_studio.schemas.memory import DeleteLongtermMem, DeleteVariable, UpdateLongtermMem, UpdateVariable, \
GetUserVar, \
SearchLongtermMem, DeleteScopeLongtermMem
from openjiuwen_studio.core.manager.memory import delete_user_variable, delete_longterm_mem, update_user_variable, \
update_longterm_mem, get_longterm_mem, get_user_variable, delete_longterm_mem_by_scope_id
from openjiuwen_studio.core.common.exceptions import JiuWenExecuteException, WorkflowFailedResponse, \
WorkflowErrorData, ErrorNodeInfo
from openjiuwen_studio.core.common.exceptions import JiuWenComponentException
from openjiuwen_studio.core.common.message import ExecuteResponseType
from openjiuwen_studio.core.executor.workflow.workflow_execution_manager import workflow_execution_manager
from openjiuwen_studio.core.executor.component.component_execution_manager import component_execution_manager
from openjiuwen_studio.core.common.language_thread_context import get_language
from openjiuwen.core.common.exception.codes import StatusCode
execution_router = APIRouter()
def _normalize_language(language: Optional[str]) -> str:
if not language:
return "zh"
language = language.strip().lower()
if language in {"cn", "zh", "zh-cn", "zh-hans", "zh-hans-cn"} or language.startswith("zh"):
return "zh"
return "en"
def _resolve_language(current_user: Optional[dict]) -> str:
"""
解析当前语言。
策略:倾向于英文 (Sticky English)。
1. 如果 HTTP Header 指定了英文,返回英文。
2. 如果用户配置 (Profile) 指定了英文,返回英文。
3. 否则返回中文。
"""
language = get_language()
header_lang = None
if language and language.strip().lower() not in {"cn"}:
header_lang = _normalize_language(language)
if header_lang == "en":
return "en"
if current_user:
locale = (current_user.get("data") or {}).get("locale")
if locale:
user_lang = _normalize_language(locale)
if user_lang == "en":
return "en"
return "zh"
class BaseParas(BaseModel):
space_id: Optional[str] = Field("")
id: str = Field(default="", description="Unique agent ID")
version: str = Field(default="", description="Version of the agent")
conversation_id: str = Field(default="", description="Conversation ID")
class ExecuteParas(BaseParas):
inputs: Dict[str, Any] = Field(default={}, description="Input parameters")
class DeepSearchParas(BaseModel):
search_type: str = Field(default="", description="Search type: search or research")
query: str = Field(description="Query content")
class CompExecuteParas(ExecuteParas):
component_id: str = Field(default="", description="Component ID")
loop_id: str = Field(default="", description="Loop component ID")
class UserInput(BaseModel):
node_id: str = Field(default="", description="Node ID")
input_value: Any = Field(default="", description="Input value")
class PluginExecuteParas(ExecuteParas):
plugin_id: str = Field(default="", description="Plugin id")
tool_id: str = Field(default="", description="Tool id")
class UserInputParas(BaseParas):
inputs: UserInput = Field(default={}, description="Input parameters")
def get_error_info_in_wf_trace(mgr, chunk):
code = status.HTTP_200_OK
message = "Executed successfully"
if isinstance(mgr, WorkflowRunner) and chunk.get("type") == ExecuteResponseType.Trace.value:
error_info = chunk.get("payload", {}).get("error")
if isinstance(error_info, dict) and error_info:
error_code = error_info.get("error_code")
if error_code:
code = error_code
message = error_info.get("message")
return code, message
async def handler(
request_body: Union[ExecuteParas, UserInputParas],
request: Request,
mgr: Union[AgentRunner, WorkflowRunner],
current_user: Dict[str, Any]
) -> AsyncGenerator[str, None]:
try:
logger.info(f"in execute: {request_body}, {current_user}")
if isinstance(request_body.inputs, UserInput):
inputs = InteractiveInput()
inputs.update(request_body.inputs.node_id, request_body.inputs.input_value)
else:
inputs = request_body.inputs
session_id = " ".join(
[
id_val.strip()
for id_val in [request_body.space_id, request_body.conversation_id, get_session_id()]
if id_val and id_val.strip()
]
)
if session_id:
set_session_id(session_id)
async for chunk in mgr.run(request_body.id, request_body.version, inputs,
request_body.conversation_id, request_body.space_id, current_user):
if await request.is_disconnected():
raise HTTPException(status_code=404, detail="Disconnected")
logger.debug(f"Received chunk: {chunk}")
code, message = get_error_info_in_wf_trace(mgr, chunk)
yield ResponseModel(
code=code,
message=message,
data=chunk
).model_dump_json()
except JiuWenExecuteException as e:
log_exception(e)
error_node_info = ErrorNodeInfo(error_code=e.code, error_message=e.message,
node_id=e.node_id, connection=e.connection)
data = WorkflowErrorData(workflow_id=e.workflow_id, error_nodes_info=[error_node_info])
yield WorkflowFailedResponse(data=data, code=e.code, message=e.message).model_dump_json()
except JiuWenGraphException as e:
log_exception(e)
yield ResponseModel(
code=e.code,
message=e.message,
data=None
).model_dump_json()
except JiuWenComponentException as e:
log_exception(e)
yield ResponseModel(
code=e.code,
message=e.message,
data={
"component_id": e.component_id,
"component_type": e.component_type,
"error_stage": e.error_stage
}
).model_dump_json()
except BaseError as e:
log_exception(e)
message = e.message
if e.code == StatusCode.AGENT_TOOL_NOT_FOUND.code:
lang = _resolve_language(current_user)
if lang == "zh":
error_msg = e.params.get("error_msg", "") if e.params else ""
if not error_msg and "reason: " in message:
try:
error_msg = message.split("reason: ", 1)[1]
except IndexError:
pass
message = f"智能体工具未找到,原因: {error_msg}"
yield ResponseModel(
code=e.code,
message=message,
data=None
).model_dump_json()
except Exception as e:
log_exception(e)
safe_message = get_safe_error_message(e)
error_code = getattr(e, 'code', -1)
yield ResponseModel(
code=error_code,
message=safe_message,
data=None
).model_dump_json()
@execution_router.post("/agent", response_model=ResponseModel[dict])
async def execute_agent(
request_body: ExecuteParas,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> EventSourceResponse:
"""Run an agent"""
try:
return EventSourceResponse(handler(request_body, request, agent_mgr, current_user))
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Agent execution failed") from e
@execution_router.post("/workflow", response_model=ResponseModel[dict])
async def execute_workflow(
request_body: ExecuteParas,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> EventSourceResponse:
"""Run a workflow"""
try:
return EventSourceResponse(handler(request_body, request, flow_mgr, current_user))
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Workflow execution failed") from e
@execution_router.post("/userInput", response_model=ResponseModel[dict])
async def handle_workflow_user_input(
request_body: UserInputParas,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> EventSourceResponse:
"""handle workflow user input"""
try:
return EventSourceResponse(handler(request_body, request, flow_mgr, current_user))
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Failed to process workflow user input") from e
@execution_router.post("/agent/userInput", response_model=ResponseModel[dict])
async def handle_agent_user_input(
request_body: UserInputParas,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> EventSourceResponse:
"""handle agent user input"""
try:
return EventSourceResponse(handler(request_body, request, agent_mgr, current_user))
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Failed to process agent user input") from e
@execution_router.post("/component", response_model=ResponseModel[dict])
async def execute_component(
request_body: CompExecuteParas,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
"""Run a single component"""
try:
if isinstance(request_body.inputs, UserInput):
inputs = InteractiveInput()
inputs.update(request_body.inputs.node_id, request_body.inputs.input_value)
else:
inputs = request_body.inputs
logger.info(f"request_body: {request_body}")
result = await comp_executor.run(request_body.id, request_body.version, inputs, request_body.component_id,
request_body.space_id, current_user, request_body.loop_id,
request_body.conversation_id)
logger.info(f"Received result: {result}")
return ResponseModel(
code=status.HTTP_200_OK, message="Component Executed successfully", data=result
)
except JiuWenComponentException as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except JiuWenGraphException as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except BaseError as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Component execution failed") from e
@execution_router.post("/workflow/validate", response_model=ResponseModel[dict])
async def validate_workflow(
request_body: ExecuteParas,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[dict]:
"""validate workflow graph"""
try:
await flow_mgr.validate(request_body.id, request_body.version, request_body.space_id, current_user)
return ResponseModel(code=status.HTTP_200_OK, message="Workflow validate success", data={})
except HTTPException:
raise
except JiuWenGraphException as e:
logger.info(f"JiuWenGraphException: {repr(e)}")
return ResponseModel(
code=e.code,
message=e.message,
data={}
)
except JiuWenComponentException as e:
logger.info(f"JiuWenComponentException: {repr(e)}")
return ResponseModel(
code=e.code,
message=e.message,
data={
"component_id": e.component_id,
"component_type": e.component_type,
"error_stage": e.error_stage
}
)
except Exception as e:
raise handle_http_exception(e, "Workflow validation failed") from e
@execution_router.post("/plugin", response_model=ResponseModel[dict])
async def execute_plugin(
request_body: PluginExecuteParas,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
"""Run a plugin"""
try:
logger.info(f"in execute: {request_body}")
if isinstance(request_body.inputs, UserInput):
inputs = InteractiveInput()
inputs.update(request_body.inputs.node_id, request_body.inputs.input_value)
else:
inputs = request_body.inputs
version = "draft"
chunk = await plugin_mgr.run(request_body.plugin_id, request_body.tool_id, inputs, request_body.space_id,
version, current_user)
logger.warning(f"Received chunk: {chunk}, type: {type(chunk)}")
return ResponseModel(code=status.HTTP_200_OK, message="Executed successfully", data=chunk)
except JiuWenComponentException as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except JiuWenGraphException as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except BaseError as e:
log_exception(e)
return ResponseModel(
code=e.code, message=e.message,
data={"type": "node", "payload": {"output": {"result": e.message, "is_success": False}}}
)
except HTTPException:
raise
except Exception as e:
raise handle_http_exception(e, "Plugin execution failed") from e
@execution_router.post("/get_trace_summary_list", response_model=ResponseModel[list[TraceSummaryBrief]],
response_model_by_alias=False)
async def get_trace_summary_list(
request: Dict,
current_user: dict = Depends(get_current_user)
):
try:
req = validate_request(request, TraceSummaryListRequest)
_ = check_user_space(req.space_id, current_user)
res = trace_summary_repository.get_trace_summary_list(req.business_id, req.business_type)
return handle_response(res)
except ValidationError as e:
logger.error(f"Trace summary list retrieval failed, error: {e.errors()}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request validation failed") from e
@execution_router.post("/get_trace_summary_by_trace_id", response_model=ResponseModel[ExecutionLogSummary],
response_model_by_alias=False)
async def get_trace_summary_by_trace_id(
request: Dict,
current_user: dict = Depends(get_current_user)
):
try:
req = validate_request(request, TraceSummaryByTraceIdRequest)
_ = check_user_space(req.space_id, current_user)
res = trace_summary_repository.get_trace_summary_by_trace_id(req.trace_id)
return handle_response(res)
except ValidationError as e:
logger.error(f"Trace summary retrieval failed for trace ID, error: {e.errors()}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request validation failed") from e
@execution_router.post("/get_latest_trace_summary", response_model=ResponseModel[ExecutionLogSummary],
response_model_by_alias=False)
async def get_latest_trace_summary(
request: Dict,
current_user: dict = Depends(get_current_user)
):
try:
req = validate_request(request, TraceSummaryLatestRequest)
_ = check_user_space(req.space_id, current_user)
res = trace_summary_repository.get_latest_trace_summary(req.business_id, req.business_type)
return handle_response(res)
except ValidationError as e:
logger.error(f"Get latest trace summary failed, error: {e.errors()}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request validation failed") from e
@execution_router.post("/get_all_trace_summaries", response_model=ResponseModel[list[TraceSummaryBriefWithStatus]],
response_model_by_alias=False)
async def get_all_trace_summaries(
request: Dict,
current_user: dict = Depends(get_current_user)
):
try:
req = validate_request(request, TraceSummaryListBySpaceRequest)
_ = check_user_space(req.space_id, current_user)
res = trace_summary_repository.get_trace_summary_list_by_space(
req.space_id, req.business_type, req.limit
)
return handle_response(res)
except ValidationError as e:
logger.error(f"Get all trace summaries failed, error: {e.errors()}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request validation failed") from e
@execution_router.post("/list_active_executions", response_model=ResponseModel[list[ActiveExecutionInfo]],
response_model_by_alias=False)
async def list_active_executions(
request: Dict,
current_user: dict = Depends(get_current_user)
):
try:
space_id = request.get("space_id") or request.get("spaceId", "")
if space_id:
_ = check_user_space(space_id, current_user)
executions = workflow_execution_manager.list_executions()
data = []
workflow_ids = []
for conv_id, info in executions.items():
if space_id and info.space_id != space_id:
continue
data.append({
"conversation_id": info.conversation_id,
"workflow_id": info.workflow_id,
"workflow_version": info.workflow_version,
"space_id": info.space_id,
"start_time": info.start_time,
})
workflow_ids.append(info.workflow_id)
if workflow_ids:
try:
from openjiuwen_studio.models.workflow import WorkflowBaseDB
from openjiuwen_studio.core.manager.repositories.jiuwen_base_repository import get_db_jw
from sqlalchemy import select
with get_db_jw() as db:
stmt = select(WorkflowBaseDB.workflow_id, WorkflowBaseDB.name).where(
WorkflowBaseDB.workflow_id.in_(workflow_ids)
)
name_map = {row.workflow_id: row.name for row in db.execute(stmt).all()}
for item in data:
item["workflow_name"] = name_map.get(item["workflow_id"])
except Exception as e:
logger.warning(f"Failed to look up workflow names for active executions: {e}")
return ResponseModel(code=status.HTTP_200_OK, message="Active executions retrieved", data=data)
except Exception as e:
logger.error(f"List active executions failed: {e}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to list active executions") from e
@execution_router.post("/get_running_traces")
async def get_running_traces(
request: Dict,
current_user: dict = Depends(get_current_user)
):
"""Find running executions by checking TraceDetail for traces without a completed TraceSummary."""
try:
req = validate_request(request, TraceSummaryListBySpaceRequest)
_ = check_user_space(req.space_id, current_user)
res = trace_summary_repository.get_running_traces_by_space(req.space_id, req.business_type)
return handle_response(res)
except ValidationError as e:
logger.error(f"Get running traces failed, error: {e.errors()}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request validation failed") from e
@execution_router.post("/memory/get_user_variable", response_model=ResponseModel[dict])
async def search_variable_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = GetUserVar(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await get_user_variable(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="get user variable success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"get user variable failed: {str(e)}",
data=None
)
@execution_router.post("/memory/get_longterm_mem", response_model=ResponseModel[dict])
async def search_longterm_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = SearchLongtermMem(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await get_longterm_mem(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="get long term mem success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Failed to retrieve long-term memory: {str(e)}",
data=None
)
@execution_router.post("/memory/delete_user_variable", response_model=ResponseModel[dict])
async def delete_variable_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = DeleteVariable(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await delete_user_variable(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="delete user variable success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"delete user variable failed: {str(e)}",
data=None
)
@execution_router.post("/memory/delete_longterm_mem", response_model=ResponseModel[dict])
async def delete_longterm_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = DeleteLongtermMem(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await delete_longterm_mem(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="delete long term memory success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Failed to delete long-term memory: {str(e)}",
data=None
)
@execution_router.post("/memory/delete_longterm_mem_by_scope", response_model=ResponseModel[dict])
async def delete_longterm_mem_by_scope(
request: dict,
current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = DeleteScopeLongtermMem(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await delete_longterm_mem_by_scope_id(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="delete long term memory success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Failed to delete long-term memory: {str(e)}",
data=None
)
@execution_router.post("/memory/update_user_variable", response_model=ResponseModel[dict])
async def update_variable_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = UpdateVariable(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await update_user_variable(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="update user variable success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"update user variable failed: {str(e)}",
data=None
)
@execution_router.post("/memory/update_longterm_mem", response_model=ResponseModel[dict])
async def update_longterm_memory(
request: dict, current_user: dict = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
req = UpdateLongtermMem(**request)
_ = check_user_space(req.user_id, current_user)
try:
data = await update_longterm_mem(req)
return ResponseModel(
code=status.HTTP_200_OK,
message="update long term memory success",
data=data
)
except Exception as e:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"update long term memory failed: {str(e)}",
data=None
)
@execution_router.post("/agent/reset", response_model=ResponseModel[dict])
async def reset_agent_instance(
request_body: ExecuteParas,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
"""
Reset agent instance, clear cache and rebuild
Args:
request_body: Request parameters containing agent_id, version, conversation_id, etc.
current_user: Current user information
Returns:
ResponseModel: Reset operation result
"""
try:
_ = check_user_space(request_body.space_id, current_user)
success = await agent_mgr.reset_agent_instance_cache(
conversation_id=request_body.conversation_id,
agent_id=request_body.id,
agent_version=request_body.version
)
result = "success" if success else "fail"
return ResponseModel(
code=status.HTTP_200_OK,
message=f"Agent instance reset {result}.",
data={}
)
except Exception as e:
log_exception(e)
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Reset agent instance failed: {str(e)}",
data=None
)
@execution_router.post("/workflow/cancel", response_model=ResponseModel[dict])
async def cancel_workflow_execution(
request_body: BaseParas,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
"""
取消正在执行的工作流
Args:
request_body: 包含 conversation_id
current_user: 当前用户信息
Returns:
ResponseModel: 取消操作的结果
"""
try:
logger.info(f"Start to cancel workflow execution")
_ = check_user_space(request_body.space_id, current_user)
execution_info = workflow_execution_manager.get_execution(request_body.conversation_id)
if not execution_info:
return ResponseModel(
code=status.HTTP_404_NOT_FOUND,
message=f"Execution not found for conversation_id: {request_body.conversation_id}",
data=None
)
success = await workflow_execution_manager.cancel_execution(
conversation_id=request_body.conversation_id
)
if success:
return ResponseModel(
code=status.HTTP_200_OK,
message="Workflow execution cancelled successfully",
data={
"conversation_id": request_body.conversation_id,
"cancelled": True
}
)
else:
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="Failed to cancel workflow execution",
data=None
)
except Exception as e:
log_exception(e)
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Cancel workflow execution failed: {str(e)}",
data=None
)
@execution_router.post("/component/cancel", response_model=ResponseModel[dict])
async def cancel_component_execution(
request_body: CompExecuteParas,
current_user: Dict[str, Any] = Depends(get_current_user)
) -> ResponseModel[Dict[str, Any]]:
"""
取消正在执行的单节点
"""
try:
_ = check_user_space(request_body.space_id, current_user)
execution_id = f"{request_body.id}:{request_body.component_id}:{request_body.conversation_id}"
exec_info = component_execution_manager.get_execution(execution_id)
if not exec_info:
return ResponseModel(
code=status.HTTP_404_NOT_FOUND,
message=f"Component execution not found for {execution_id}",
data=None
)
success = await component_execution_manager.cancel_execution(execution_id=execution_id)
if success:
return ResponseModel(
code=status.HTTP_200_OK,
message="Component execution cancelled successfully",
data={
"workflow_id": request_body.id,
"component_id": request_body.component_id,
"conversation_id": request_body.conversation_id,
"cancelled": True,
"warning": "component execution cancel by user"
}
)
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="Failed to cancel component execution",
data=None
)
except Exception as e:
log_exception(e)
return ResponseModel(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message=f"Cancel component execution failed: {str(e)}",
data=None
)