"""MCP Agent执行器"""
import json
import logging
import platform
import re
import uuid
from datetime import UTC, datetime
from pathlib import Path
from textwrap import dedent
from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from mcp.types import TextContent
from pydantic import Field, ValidationError
from apps.common.config import config
from apps.constants import AGENT_MAX_STEPS
from apps.models import ExecutorHistory, ExecutorStatus, LanguageType, MCPTools, StepStatus
from apps.models.task import ExecutorCheckpoint
from apps.scheduler.executor.base import BaseExecutor
from apps.scheduler.mcp_agent.func import READ_TOOL_FUNCTION, UPDATE_TOOL_FUNCTION
from apps.scheduler.mcp_agent.host import MCPHost
from apps.scheduler.mcp_agent.plan import MCPPlanner
from apps.scheduler.pool.mcp.pool import mcp_pool
from apps.schemas.enum_var import EventType
from apps.schemas.llm import LLMToolCall
from apps.schemas.mcp import MCPRiskConfirm
from apps.schemas.task import AgentCheckpointExtra, AgentHistoryExtra, TaskData
from apps.services.user import UserManager
_logger = logging.getLogger(__name__)
IGNORE_RISK_TOOL = {"update_todo_list", "read_todo_list"}
class MCPAgentPrompt:
"""MCP Agent提示词处理器"""
def __init__(self, task: TaskData, role: str = "main") -> None:
"""初始化MCP Agent提示词处理器"""
self.task = task
self.role = role
self.language: LanguageType = task.runtime.language
self._env = SandboxedEnvironment(
loader=BaseLoader(),
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
)
self._prompt_base_dir = Path(config.deploy.data_dir) / "prompts" / "system"
def load_prompt(self, prompt_id: str, prompt_type: str = "part") -> str:
"""加载提示词"""
prompt_file = self._prompt_base_dir / prompt_type / f"{prompt_id}.{self.language.value}.txt"
if not prompt_file.exists():
err = f"[MCPAgentPrompt] 提示词文件不存在: {prompt_file}"
_logger.error(err)
raise FileNotFoundError(err)
return prompt_file.read_text(encoding="utf-8")
def _format_env(self) -> str:
"""格式化环境变量"""
os_version = f"{platform.system()} {platform.release()}"
os_arch = platform.machine()
current_date = datetime.now(tz=UTC).astimezone().strftime("%Y-%m-%d")
user_id = self.task.metadata.userId
env_prompt_template = self.load_prompt("env")
template = self._env.from_string(env_prompt_template)
return template.render(
os_version=os_version,
os_arch=os_arch,
current_date=current_date,
user_id=user_id,
)
def format_tools(self, tools: dict[str, MCPTools]) -> str:
"""格式化工具列表"""
tool_prompt_template = self.load_prompt("tool")
sorted_tools = dict(sorted(tools.items(), key=lambda x: x[0]))
template = self._env.from_string(tool_prompt_template)
return template.render(tools=sorted_tools)
def format(self, template: str, tools: dict[str, MCPTools]) -> str:
"""格式化提示词模板"""
result = template
pattern = r"\{([^.}]+)\.([^}]+)\}"
matches = re.findall(pattern, template)
for prompt_type, prompt_id in matches:
placeholder = f"{{{prompt_type}.{prompt_id}}}"
if prompt_id == "env":
replacement = self._format_env()
elif prompt_id == "tool":
replacement = self.format_tools(tools)
else:
replacement = self.load_prompt(prompt_id, prompt_type=prompt_type)
result = result.replace(placeholder, replacement)
return dedent(result.strip())
class MCPAgentExecutor(BaseExecutor):
"""MCP Agent执行器"""
agent_id: uuid.UUID = Field(default=uuid.uuid4(), description="App ID作为Agent ID")
agent_description: str = Field(default="", description="Agent描述")
params: dict[str, Any] | None = Field(
default=None,
description="流执行过程中的参数补充",
alias="params",
)
async def init(self) -> None:
"""初始化MCP Agent"""
if not self.question:
self.question = self.task.runtime.userInput
self._mcp_list: list = []
self._tool_list: dict[str, MCPTools] = {}
self._system_prompt = ""
self._step_tool_call_id = ""
self._current_step_count = 0
self._current_todo_list = ""
user = await UserManager.get_user(self.task.metadata.userId)
if not user:
err = "[MCPAgentExecutor] 用户不存在: %s"
_logger.error(err)
raise RuntimeError(err)
self._user = user
self._planner = MCPPlanner(self.task)
self._host = MCPHost(self.task)
self._prompt_mgmt = MCPAgentPrompt(self.task)
if not self.task.state:
user_input = self.task.runtime.userInput or ""
self.task.state = ExecutorCheckpoint(
taskId=self.task.metadata.id,
appId=self.agent_id,
executorId=str(uuid.uuid4()),
executorName=user_input[:20],
executorStatus=ExecutorStatus.INIT,
stepId=uuid.uuid4(),
stepName="",
stepStatus=StepStatus.INIT,
stepType="",
)
else:
await self._restore_checkpoint()
async def _restore_checkpoint(self) -> None:
"""从checkpoint恢复步骤计数和todo列表到内存变量"""
if not self.task.state or not self.task.state.extraData:
return
try:
checkpoint_extra = AgentCheckpointExtra.model_validate(self.task.state.extraData)
self._current_step_count = checkpoint_extra.step_count
self._current_todo_list = checkpoint_extra.todo_list
self.llm.input_tokens = checkpoint_extra.input_token
self.llm.output_tokens = checkpoint_extra.output_token
_logger.info("[MCPAgentExecutor] 从checkpoint恢复数据")
except Exception:
_logger.exception("[MCPAgentExecutor] 恢复checkpoint失败")
async def create_tool_list(self, query: str) -> dict[str, MCPTools]:
"""创建工具列表"""
_logger.info("[MCPAgentExecutor] 创建工具列表")
tool_list = await self._host.select_tools(query=query, mcp_list=self._mcp_list, top_n=20)
update_todo_func = UPDATE_TOOL_FUNCTION[self.task.runtime.language]
tool_list[update_todo_func["name"]] = MCPTools(
mcpId="",
toolName=update_todo_func["name"],
description=self._prompt_mgmt.load_prompt("update_todo_list", "func"),
inputSchema=update_todo_func["parameters"],
outputSchema=update_todo_func["output"],
)
read_todo_func = READ_TOOL_FUNCTION[self.task.runtime.language]
tool_list[read_todo_func["name"]] = MCPTools(
mcpId="",
toolName=read_todo_func["name"],
description=self._prompt_mgmt.load_prompt("read_todo_list", "func"),
inputSchema=read_todo_func["parameters"],
outputSchema=read_todo_func["output"],
)
return tool_list
def _update_last_context_status(self, step_status: StepStatus, output_data: dict | None = None) -> None:
"""更新最后一个context的状态(如果是当前步骤)"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId:
self.task.context[-1].stepStatus = step_status
if output_data is not None:
self.task.context[-1].outputData = output_data
async def _finish_message(
self,
role: str,
step_status: StepStatus,
input_data: dict | None = None,
output_data: dict | None = None,
) -> None:
"""保存消息到历史记录"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
if role == "assistant":
assistant_text = input_data.get("assistant", "") if input_data else ""
if assistant_text:
await self._push_message(EventType.TEXT_ADD, assistant_text)
elif role == "tool":
await self._push_message(EventType.STEP_OUTPUT, output_data or {})
extra_data = AgentHistoryExtra(
role=role,
tool_call_id=self._step_tool_call_id if role == "tool" else None,
).model_dump()
self.task.context.append(
ExecutorHistory(
taskId=self.task.metadata.id,
stepId=self.task.state.stepId,
stepName=self.task.state.stepName,
stepType="",
stepStatus=step_status,
inputData=input_data or {},
outputData=output_data or {},
extraData=extra_data,
),
)
async def handle_waiting_status(self) -> bool:
"""处理WAITING状态,判断用户是否批准执行"""
async def _cancel_and_cleanup() -> None:
"""取消任务并清理状态的内部函数"""
if not self.task.state:
return
self.task.state.executorStatus = ExecutorStatus.CANCELLED
self.task.state.stepStatus = StepStatus.CANCELLED
await self._push_message(EventType.STEP_OUTPUT, data={})
self._update_last_context_status(StepStatus.CANCELLED)
if not self.params:
_logger.warning("[MCPAgentExecutor] 无参数,取消任务")
await _cancel_and_cleanup()
return False
try:
risk_confirm = MCPRiskConfirm.model_validate(self.params)
except ValidationError as e:
_logger.warning("[MCPAgentExecutor] 解析风险确认参数失败: %s, 取消执行", e)
await _cancel_and_cleanup()
return False
if not risk_confirm.confirm:
_logger.info("[MCPAgentExecutor] 用户拒绝执行,取消任务")
await _cancel_and_cleanup()
return False
if self.task.state:
self.task.state.stepStatus = StepStatus.RUNNING
for history in self.task.context:
if history.stepId == self.task.state.stepId:
if history.extraData:
history_extra = AgentHistoryExtra.model_validate(history.extraData)
else:
history_extra = AgentHistoryExtra()
history_extra.risk_confirmed = True
history.extraData = history_extra.model_dump()
_logger.info("[MCPAgentExecutor] 标记工具 %s 风险已确认", history.stepName)
break
return True
async def handle_update_todo_step(
self,
tool_arguments: dict[str, Any],
tool_history: ExecutorHistory,
) -> None:
"""处理更新TODO列表步骤,直接接受大模型Function Call的参数并存储todo list"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
_logger.info("[MCPAgentExecutor] 准备更新todo list")
self.task.state.stepStatus = StepStatus.RUNNING
await self._push_message(EventType.STEP_INPUT, tool_arguments)
todo_list = tool_arguments.get("todo_list", "")
if not todo_list:
_logger.warning("[MCPAgentExecutor] todo_list参数为空")
self._current_todo_list = todo_list
self.task.state.stepStatus = StepStatus.SUCCESS
output_data = {"status": "success"}
tool_history.stepStatus = StepStatus.SUCCESS
tool_history.outputData = output_data
await self._push_message(EventType.STEP_OUTPUT, output_data)
async def handle_read_todo_step(self, tool_history: ExecutorHistory) -> None:
"""处理读取TODO步骤,返回当前todo list"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
_logger.info("[MCPAgentExecutor] 读取todo list")
self.task.state.stepStatus = StepStatus.RUNNING
input_data = {}
await self._push_message(EventType.STEP_INPUT, input_data)
self.task.state.stepStatus = StepStatus.SUCCESS
output_data = {"todo": self._current_todo_list}
tool_history.stepStatus = StepStatus.SUCCESS
tool_history.outputData = output_data
await self._push_message(EventType.STEP_OUTPUT, output_data)
async def _handle_special_tool(
self,
selected_tool: MCPTools,
tool_arguments: dict[str, Any],
tool_history: ExecutorHistory,
) -> bool:
"""处理特殊工具(TODO相关工具)"""
if selected_tool.toolName == UPDATE_TOOL_FUNCTION[self.task.runtime.language]["name"]:
await self.handle_update_todo_step(tool_arguments, tool_history)
return True
if selected_tool.toolName == READ_TOOL_FUNCTION[self.task.runtime.language]["name"]:
await self.handle_read_todo_step(tool_history)
return True
return False
async def _check_and_confirm_risk(
self,
selected_tool: MCPTools,
tool_params: dict[str, Any],
) -> bool:
"""检查并确认工具执行风险;需要确认为True,可以执行为False"""
if self._user.autoExecute:
return False
if (self.task.context and
self.task.context[-1].extraData and
self.task.context[-1].stepName == selected_tool.toolName and
AgentHistoryExtra.model_validate(self.task.context[-1].extraData).risk_confirmed is True):
_logger.info("[MCPAgentExecutor] 工具 %s 已确认风险,跳过风险检查", selected_tool.toolName)
return False
_logger.info("[MCPAgentExecutor] autoExecute=False,需要确认工具风险")
confirm_message = await self._planner.get_tool_risk(selected_tool, tool_params)
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
self.task.state.executorStatus = ExecutorStatus.WAITING
self.task.state.stepStatus = StepStatus.WAITING
await self._push_message(
EventType.STEP_WAITING_FOR_START,
confirm_message.model_dump(exclude_none=True, by_alias=True),
)
return True
async def _execute_single_tool(
self,
tool_call: LLMToolCall,
selected_tool: MCPTools,
tool_history: ExecutorHistory,
) -> None:
"""执行单个工具"""
tool_params = tool_call.arguments
_logger.info("[MCPAgentExecutor] 执行工具: %s, 参数: %s", selected_tool.toolName, tool_params)
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
if await self._handle_special_tool(selected_tool, tool_params, tool_history):
return
mcp_client = await mcp_pool.get(selected_tool.mcpId, self.task.metadata.userId)
output_data = await mcp_client.call_tool(selected_tool.toolName, tool_params)
if output_data.isError:
err = ""
for output in output_data.content:
if isinstance(output, TextContent):
err += output.text
_logger.error("[MCPAgentExecutor] 工具 %s 执行失败: %s", selected_tool.toolName, err)
raise RuntimeError(err)
message = ""
for output in output_data.content:
if isinstance(output, TextContent):
message += output.text
try:
decoded_message = json.loads(message)
except json.JSONDecodeError:
_logger.warning("[MCPAgentExecutor] 工具输出不是有效的JSON,使用原始字符串")
decoded_message = {"message": message}
_logger.info("[MCPAgentExecutor] 工具 %s 执行成功", selected_tool.toolName)
tool_history.stepStatus = StepStatus.SUCCESS
tool_history.outputData = decoded_message
await self._push_message(EventType.STEP_OUTPUT, decoded_message)
def _assemble_user_prompt(self) -> str:
"""组装用户提示词"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
user_template = self.task.runtime.userInput or ""
if self._current_step_count >= AGENT_MAX_STEPS:
_logger.warning("[MCPAgentExecutor] 当前步骤数 %d 已达到最大步骤数 %d",
self._current_step_count, AGENT_MAX_STEPS)
user_template += r"""
{alert.max_step_reached}
"""
if not self._current_todo_list or self._current_todo_list.strip() == "":
_logger.warning("[MCPAgentExecutor] 当前todo_list为空,添加todo_empty提示")
user_template += r"""
{alert.todo_empty}
"""
return self._prompt_mgmt.format(template=user_template, tools=self._tool_list)
async def _add_initial_user_message_if_needed(self) -> None:
"""判断是否需要添加初始 user 消息,如果需要则添加"""
need_user_message = False
if not self.task.context:
need_user_message = True
else:
last_history = self.task.context[-1]
if last_history.extraData:
history_extra = AgentHistoryExtra.model_validate(last_history.extraData)
if history_extra.role == "assistant":
need_user_message = True
if need_user_message:
_logger.info("[MCPAgentExecutor] 添加初始 user 消息")
self.task.runtime.userInput = self.question
self._current_step_count = 1
user_prompt = self._assemble_user_prompt()
await self._finish_message(
role="user",
step_status=StepStatus.SUCCESS,
input_data={"user": user_prompt},
)
async def _call_llm_and_create_pending_histories(self) -> list[ExecutorHistory]:
"""调用 LLM 并创建待执行的 history 列表"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
self._current_step_count += 1
_logger.info("[MCPAgentExecutor] 更新步骤计数: %d", self._current_step_count)
system_prompt = self._system_prompt
step_text_output, tool_calls = await self._host.call_llm_and_parse_tools(
task=self.task,
llm=self.llm,
tool_list=self._tool_list,
system_prompt=system_prompt,
)
await self._finish_message(
role="assistant",
step_status=StepStatus.SUCCESS,
input_data={"assistant": step_text_output},
)
if not tool_calls:
_logger.warning("[MCPAgentExecutor] LLM未返回工具调用")
self.task.state.executorStatus = ExecutorStatus.SUCCESS
return []
pending_histories: list[ExecutorHistory] = []
for tool_call in tool_calls:
if tool_call.name not in self._tool_list:
_logger.error("[MCPAgentExecutor] 工具 %s 不存在于工具列表中,状态设为ERROR", tool_call.name)
step_status = StepStatus.ERROR
output_data = {"error": f"工具 {tool_call.name} 不存在于工具列表中"}
should_add_to_pending = False
else:
step_status = StepStatus.WAITING
output_data = {}
should_add_to_pending = True
extra_data = AgentHistoryExtra(
role="tool",
tool_call_id=tool_call.id,
).model_dump()
history = ExecutorHistory(
taskId=self.task.metadata.id,
stepId=uuid.uuid4(),
stepName=tool_call.name,
stepType="",
stepStatus=step_status,
inputData=tool_call.arguments,
outputData=output_data,
extraData=extra_data,
)
self.task.context.append(history)
if should_add_to_pending:
pending_histories.append(history)
return pending_histories
def _find_pending_histories_from_context(self) -> list[ExecutorHistory]:
"""从 context 中查找所有连续的 WAITING 状态的 history"""
_logger.info("[MCPAgentExecutor] resume=True,从history中查找WAITING状态的记录")
pending_histories: list[ExecutorHistory] = []
for history in reversed(self.task.context):
if history.stepStatus == StepStatus.WAITING:
pending_histories.insert(0, history)
else:
break
return pending_histories
async def _execute_pending_histories(self, pending_histories: list[ExecutorHistory]) -> None:
"""统一执行所有待执行的 history"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
_logger.info("[MCPAgentExecutor] 开始执行 %d 个待执行的工具", len(pending_histories))
for history in pending_histories:
tool_name = history.stepName
tool_arguments = history.inputData
selected_tool = self._tool_list.get(tool_name)
if not selected_tool:
_logger.error("[MCPAgentExecutor] 工具 %s 不存在于工具列表中,状态改为ERROR", tool_name)
history.stepStatus = StepStatus.ERROR
history.outputData = {"error": f"工具 {tool_name} 不存在于工具列表中"}
continue
tool_call_id = ""
if history.extraData:
history_extra = AgentHistoryExtra.model_validate(history.extraData)
tool_call_id = history_extra.tool_call_id or ""
self.task.state.stepId = history.stepId
self.task.state.stepName = selected_tool.toolName
self.task.state.stepStatus = StepStatus.INIT
self._step_tool_call_id = tool_call_id
if (
selected_tool.toolName not in IGNORE_RISK_TOOL
and await self._check_and_confirm_risk(selected_tool, tool_arguments)
):
_logger.info("[MCPAgentExecutor] 工具 %s 需要风险确认,暂停执行", tool_name)
break
tool_call = LLMToolCall(
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
)
try:
await self._execute_single_tool(tool_call, selected_tool, history)
except Exception as e:
_logger.exception("[MCPAgentExecutor] 执行工具 %s 时发生错误", tool_name)
await mcp_pool.stop(selected_tool.mcpId, self.task.metadata.userId)
self.task.state.stepStatus = StepStatus.ERROR
self.task.state.errorMessage = {
"err_msg": str(e),
"data": tool_arguments,
}
history.stepStatus = StepStatus.ERROR
history.outputData = {"error": str(e)}
continue
async def run_step(self, *, resume: bool = False) -> None:
"""执行步骤"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
self.task.state.executorStatus = ExecutorStatus.RUNNING
if not resume:
pending_histories = await self._call_llm_and_create_pending_histories()
else:
pending_histories = self._find_pending_histories_from_context()
await self._execute_pending_histories(pending_histories)
async def generate_params_with_null(self) -> None:
"""生成参数补充"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
if not self.task.context:
err = "[MCPAgentExecutor] 任务上下文为空,无法获取工具信息"
_logger.error(err)
raise RuntimeError(err)
last_history = self.task.context[-1]
tool_name = last_history.stepName
current_tool = self._tool_list.get(tool_name)
if not current_tool:
err = f"[MCPAgentExecutor] 在tool_list中未找到工具: {tool_name}"
_logger.error(err)
raise RuntimeError(err)
params_with_null = await self._planner.get_missing_param(
current_tool,
self.task.context[-1].inputData,
self.task.state.errorMessage,
)
error_msg = self.task.state.errorMessage["err_msg"]
self.task.state.executorStatus = ExecutorStatus.WAITING
self.task.state.stepStatus = StepStatus.PARAM
await self._push_message(
EventType.STEP_WAITING_FOR_PARAM,
{"message": error_msg, "params": params_with_null},
)
async def handle_param_status(self, history: ExecutorHistory) -> bool:
"""处理PARAM状态的逻辑"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
if not self.params:
_logger.warning("[MCPAgentExecutor] 用户没有输入参数,取消任务")
self.task.state.executorStatus = ExecutorStatus.CANCELLED
self.task.state.stepStatus = StepStatus.CANCELLED
await self._push_message(EventType.STEP_OUTPUT, data={})
self._update_last_context_status(StepStatus.CANCELLED)
return False
if self.params:
history.inputData.update(self.params)
self.task.state.stepStatus = StepStatus.WAITING
self.task.state.executorStatus = ExecutorStatus.RUNNING
_logger.info("[MCPAgentExecutor] PARAM状态,参数已整合,状态改为WAITING")
return True
def _save_checkpoint(self) -> None:
"""保存步骤计数和todo列表到checkpoint"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
checkpoint_extra = AgentCheckpointExtra(
step_count=self._current_step_count,
todo_list=self._current_todo_list,
input_token=self.llm.input_tokens,
output_token=self.llm.output_tokens,
)
self.task.state.extraData = checkpoint_extra.model_dump()
_logger.debug(
"[MCPAgentExecutor] 保存checkpoint: step_count=%d, todo_list=%s, input_token=%d, output_token=%d",
self._current_step_count,
self._current_todo_list[:50] if self._current_todo_list else "",
self.llm.input_tokens,
self.llm.output_tokens,
)
async def run(self) -> None:
"""执行MCP Agent的主逻辑"""
if not self.task.state:
err = "[MCPAgentExecutor] 任务状态不存在"
_logger.error(err)
raise RuntimeError(err)
_logger.info("[MCPAgentExecutor] 创建工具列表并选择工具")
self._tool_list = await self.create_tool_list(query=self.task.runtime.userInput)
_logger.info("[MCPAgentExecutor] 创建系统提示词")
system_template = r"""
{role.main}
{part.todo}
{part.final}
{part.tool}
{part.env}
"""
self._system_prompt = self._prompt_mgmt.format(template=system_template, tools=self._tool_list)
await self._add_initial_user_message_if_needed()
self.task.state.executorStatus = ExecutorStatus.RUNNING
while self.task.state.executorStatus == ExecutorStatus.RUNNING:
if len(self.task.context) > 0:
last_history = self.task.context[-1]
last_step_status = last_history.stepStatus
if last_step_status == StepStatus.PARAM:
_logger.info("[MCPAgentExecutor] 检测到PARAM状态,处理参数确认")
if not await self.handle_param_status(last_history):
break
if last_step_status == StepStatus.WAITING:
_logger.info("[MCPAgentExecutor] 检测到WAITING状态,等待用户批准")
if not await self.handle_waiting_status():
break
_logger.info("[MCPAgentExecutor] 用户批准执行,恢复执行待审批的工具")
await self.run_step(resume=True)
continue
_logger.info("[MCPAgentExecutor] 运行Agent的一个Step")
await self.run_step()
for mcp_service in self._mcp_list:
try:
await mcp_pool.stop(mcp_service.id, self.task.metadata.userId)
except Exception:
_logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误")
self._save_checkpoint()