"""MCP工具"""
import logging
from collections.abc import AsyncGenerator
from copy import deepcopy
from typing import Any
from pydantic import Field
from apps.models import LanguageType
from apps.scheduler.call.core import CoreCall
from apps.scheduler.mcp import MCPHost, MCPPlanner, MCPSelector
from apps.schemas.enum_var import CallOutputType
from apps.schemas.mcp import MCPPlanItem
from apps.schemas.scheduler import (
CallError,
CallInfo,
CallOutputChunk,
CallVars,
)
from .schema import (
MCPInput,
MCPMessage,
MCPMessageType,
MCPOutput,
)
logger = logging.getLogger(__name__)
MCP_GENERATE: dict[str, dict[LanguageType, str]] = {
"START": {
LanguageType.CHINESE: "[MCP] 开始生成计划...\n\n\n\n",
LanguageType.ENGLISH: "[MCP] Start generating plan...\n\n\n\n",
},
"END": {
LanguageType.CHINESE: "[MCP] 计划生成完成:\n\n{plan_str}\n\n\n\n",
LanguageType.ENGLISH: "[MCP] Plan generation completed: \n\n{plan_str}\n\n\n\n",
},
}
MCP_SUMMARY: dict[str, dict[LanguageType, str]] = {
"START": {
LanguageType.CHINESE: "[MCP] 正在总结任务结果...\n\n",
LanguageType.ENGLISH: "[MCP] Start summarizing task results...\n\n",
},
"END": {
LanguageType.CHINESE: "[MCP] 任务完成\n\n---\n\n{answer}\n\n",
LanguageType.ENGLISH: "[MCP] Task summary completed\n\n{answer}\n\n",
},
}
MCP_TOOL: dict[str, dict[LanguageType, str]] = {
"BEGIN": {
LanguageType.CHINESE: "[MCP] 正在调用工具 {tool_name}...\n\n",
LanguageType.ENGLISH: "[MCP] Calling tool {tool_name}...\n\n",
},
"END": {
LanguageType.CHINESE: "[MCP] 工具 {tool_name} 调用完成\n\n",
LanguageType.ENGLISH: "[MCP] Tool {tool_name} call completed\n\n",
},
}
class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput):
"""MCP工具"""
mcp_list: list[str] = Field(description="MCP Server ID列表", max_length=5, min_length=1)
max_steps: int = Field(description="最大步骤数", default=20)
text_output: bool = Field(description="是否将结果以文本形式返回", default=True)
to_user: bool = Field(description="是否将结果返回给用户", default=True)
@classmethod
def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo:
"""
返回Call的名称和描述
:return: Call的名称和描述
:rtype: CallInfo
"""
i18n_info = {
LanguageType.CHINESE: CallInfo(name="MCP", description="调用MCP Server,执行工具"),
LanguageType.ENGLISH: CallInfo(name="MCP", description="Call MCP Server to execute tools"),
}
return i18n_info[language]
async def _init(self, call_vars: CallVars) -> MCPInput:
"""初始化MCP"""
self._host = MCPHost(
call_vars.ids.user_id,
call_vars.ids.task_id,
self._llm_obj,
self._sys_vars.language,
)
self._tool_list = await self._host.get_tool_list(self.mcp_list)
self._call_vars = call_vars
avaliable_tools = {}
for tool in self._tool_list:
if tool.mcpId not in avaliable_tools:
avaliable_tools[tool.mcpId] = []
avaliable_tools[tool.mcpId].append(tool.toolName)
return MCPInput(avaliable_tools=avaliable_tools, max_steps=self.max_steps)
async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]:
"""执行MCP"""
async for chunk in self._generate_plan():
yield chunk
plan_list = deepcopy(self._plan.plans)
while len(plan_list) > 0:
async for chunk in self._execute_plan_item(plan_list.pop(0)):
yield chunk
async for chunk in self._generate_answer():
yield chunk
async def _generate_plan(self) -> AsyncGenerator[CallOutputChunk, None]:
"""生成执行计划"""
yield self._create_output(MCP_GENERATE["START"][self._sys_vars.language], MCPMessageType.PLAN_BEGIN)
selector = MCPSelector(self._llm_obj)
top_tool = await selector.select_top_tool(self._call_vars.question, self.mcp_list)
planner = MCPPlanner(
self._call_vars.question,
language=self._sys_vars.language,
llm=self._llm_obj,
)
self._plan = await planner.create_plan(top_tool, self.max_steps)
plan_str = "\n\n"
for plan_item in self._plan.plans:
plan_str += f"[+] {plan_item.content}; {plan_item.tool}[{plan_item.instruction}]\n\n"
yield self._create_output(
MCP_GENERATE["END"][self._sys_vars.language].format(plan_str=plan_str),
MCPMessageType.PLAN_END,
data=self._plan.model_dump(),
)
async def _execute_plan_item(self, plan_item: MCPPlanItem) -> AsyncGenerator[CallOutputChunk, None]:
"""执行单个计划项"""
if plan_item.tool == "Final":
return
tool = next((tool for tool in self._tool_list if tool.id == plan_item.tool), None)
if tool is None:
err = f"[MCP] 工具 {plan_item.tool} 不存在"
logger.error(err)
raise CallError(err, data={})
yield self._create_output(
MCP_TOOL["BEGIN"][self._sys_vars.language].format(tool_name=tool.toolName),
MCPMessageType.TOOL_BEGIN,
)
try:
result = await self._host.call_tool(tool, plan_item)
except Exception as e:
err = f"[MCP] 工具 {tool.toolName} 调用失败: {e!s}"
logger.exception(err)
raise CallError(err, data={}) from e
logger.info("[MCP] 工具 %s 调用完成, 结果: %s", tool.toolName, result)
yield self._create_output(
MCP_TOOL["END"][self._sys_vars.language].format(tool_name=tool.toolName),
MCPMessageType.TOOL_END,
data={
"data": result,
},
)
async def _generate_answer(self) -> AsyncGenerator[CallOutputChunk, None]:
"""生成总结"""
yield self._create_output(
MCP_SUMMARY["START"][self._sys_vars.language],
MCPMessageType.FINISH_BEGIN,
)
planner = MCPPlanner(
self._call_vars.question,
language=self._sys_vars.language,
llm=self._llm_obj,
)
answer = await planner.generate_answer(self._plan, await self._host.assemble_memory())
yield self._create_output(
MCP_SUMMARY["END"][self._sys_vars.language].format(answer=answer),
MCPMessageType.FINISH_END,
data=MCPOutput(
message=answer,
).model_dump(),
)
def _create_output(
self,
text: str,
msg_type: MCPMessageType,
data: dict[str, Any] | None = None,
) -> CallOutputChunk:
"""创建输出"""
if self.text_output:
return CallOutputChunk(type=CallOutputType.TEXT, content=text)
return CallOutputChunk(type=CallOutputType.DATA, content=MCPMessage(
msg_type=msg_type,
message=text.strip(),
data=data or {},
).model_dump_json())